This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 803b738636 IGNITE-19451 Prohibit some types from being Marshallable 
(#2163)
803b738636 is described below

commit 803b7386360743f8be524f9554a81fc8980d3ea2
Author: Alexander Polovtcev <alex.polovt...@gmail.com>
AuthorDate: Thu Jun 8 15:57:04 2023 +0300

    IGNITE-19451 Prohibit some types from being Marshallable (#2163)
---
 .../internal/network/processor/TypeUtils.java      | 21 ++++-
 .../messages/MarshallableTypesBlackList.java       | 37 +++++++-
 .../processor/messages/MessageImplGenerator.java   |  6 +-
 .../src/main/resources/marshallable.blacklist      |  4 +
 .../processor/MarshallableBlacklistTest.java       | 36 ++++++--
 .../processor/TransferableObjectProcessorTest.java | 13 ---
 .../src/test/resources/marshallable.blacklist      |  1 +
 .../message/LeaseGrantedMessage.java               | 17 ++--
 .../negotiation/LeaseNegotiator.java               |  4 +-
 .../replicator/PlacementDriverReplicaSideTest.java |  4 +-
 .../sql/engine/schema/IgniteTableImpl.java         | 17 +++-
 .../ignite/distributed/ReplicaUnavailableTest.java | 10 +--
 .../table/distributed/TableMessageGroup.java       |  6 ++
 .../replication/request/BinaryTupleMessage.java}   | 28 ++++--
 .../request/MultipleRowReplicaRequest.java         | 21 ++++-
 .../request/ReadOnlyReplicaRequest.java            |  8 +-
 .../request/ScanRetrieveBatchReplicaRequest.java   | 12 +--
 .../request/SingleRowReplicaRequest.java           |  9 +-
 .../replication/request/SwapRowReplicaRequest.java | 16 +++-
 .../replicator/PartitionReplicaListener.java       | 23 +++--
 .../distributed/storage/InternalTableImpl.java     | 96 +++++++++++++--------
 .../PartitionReplicaListenerIndexLockingTest.java  |  5 +-
 .../replication/PartitionReplicaListenerTest.java  | 99 ++++++++++++----------
 23 files changed, 335 insertions(+), 158 deletions(-)

diff --git 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/TypeUtils.java
 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/TypeUtils.java
index 5f3cf13735..e5371672d6 100644
--- 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/TypeUtils.java
+++ 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/TypeUtils.java
@@ -46,6 +46,14 @@ public class TypeUtils {
         this.elements = processingEnvironment.getElementUtils();
     }
 
+    public Types types() {
+        return types;
+    }
+
+    public Elements elements() {
+        return elements;
+    }
+
     /**
      * Returns {@code true} if the <i>erasure</i> of the given types are 
actually the same type.
      *
@@ -69,7 +77,18 @@ public class TypeUtils {
     public boolean isSubType(TypeMirror type1, Class<?> type2) {
         TypeMirror type2Mirror = typeMirrorFromClass(type2);
 
-        return types.isSubtype(erasure(type1), erasure(type2Mirror));
+        return isSubType(type1, type2Mirror);
+    }
+
+    /**
+     * Returns {@code true} if the <i>erasure</i> of the first type is a 
subtype of the second type.
+     *
+     * @param type1 first type (represented by a mirror)
+     * @param type2 second type (represented by a mirror)
+     * @return {@code true} if the erasure of the first type is a subtype of 
the second type, {@code false} otherwise.
+     */
+    public boolean isSubType(TypeMirror type1, TypeMirror type2) {
+        return types.isSubtype(erasure(type1), erasure(type2));
     }
 
     /**
diff --git 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MarshallableTypesBlackList.java
 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MarshallableTypesBlackList.java
index a976b1406b..698f1045a2 100644
--- 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MarshallableTypesBlackList.java
+++ 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MarshallableTypesBlackList.java
@@ -17,17 +17,25 @@
 
 package org.apache.ignite.internal.network.processor.messages;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import javax.lang.model.element.TypeElement;
 import javax.lang.model.type.ArrayType;
 import javax.lang.model.type.DeclaredType;
 import javax.lang.model.type.TypeMirror;
 import javax.lang.model.util.SimpleTypeVisitor9;
+import org.apache.ignite.internal.network.processor.ProcessingException;
 import org.apache.ignite.internal.network.processor.TypeUtils;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.NetworkMessage;
@@ -38,6 +46,9 @@ import org.apache.ignite.network.annotations.Marshallable;
  * {@code Marshallable} only if it is not supported by the Direct Marshaller 
natively.
  */
 public class MarshallableTypesBlackList {
+    /** Name of the file containing a newline-separated list of blacklisted 
class names. */
+    private static final String BLACKLIST_FILE_NAME = "marshallable.blacklist";
+
     /** Types supported by the Direct Marshaller. */
     public static final List<Class<?>> NATIVE_TYPES = List.of(
             // Primitive type wrappers
@@ -79,10 +90,28 @@ public class MarshallableTypesBlackList {
     private static class TypeVisitor extends SimpleTypeVisitor9<Boolean, Void> 
{
         private final TypeUtils typeUtils;
 
+        private final List<TypeMirror> resourcesBlacklist;
+
         TypeVisitor(TypeUtils typeUtils) {
             super(false);
 
             this.typeUtils = typeUtils;
+            this.resourcesBlacklist = readBlacklistFromResources();
+        }
+
+        private List<TypeMirror> readBlacklistFromResources() {
+            try (
+                    InputStream is = 
getClass().getClassLoader().getResourceAsStream(BLACKLIST_FILE_NAME);
+                    BufferedReader reader = new BufferedReader(new 
InputStreamReader(is))
+            ) {
+                return reader.lines()
+                        .map(typeUtils.elements()::getTypeElement)
+                        .filter(Objects::nonNull)
+                        .map(TypeElement::asType)
+                        .collect(Collectors.toList());
+            } catch (IOException e) {
+                throw new ProcessingException("Unable to read " + 
BLACKLIST_FILE_NAME, e);
+            }
         }
 
         @Override
@@ -97,11 +126,17 @@ public class MarshallableTypesBlackList {
                 return t.getTypeArguments().stream().anyMatch(this::visit);
             }
 
-            return !isSameType(NATIVE_TYPES, t) && !typeUtils.isSubType(t, 
NetworkMessage.class);
+            return !isSameType(NATIVE_TYPES, t)
+                    && !typeUtils.isSubType(t, NetworkMessage.class)
+                    && !isSubType(resourcesBlacklist, t);
         }
 
         private boolean isSameType(List<Class<?>> types, DeclaredType type) {
             return types.stream().anyMatch(cls -> typeUtils.isSameType(type, 
cls));
         }
+
+        private boolean isSubType(List<TypeMirror> types, DeclaredType type) {
+            return types.stream().anyMatch(cls -> typeUtils.isSubType(type, 
cls));
+        }
     }
 }
diff --git 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index a7693f4e4b..091b9c3489 100644
--- 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -122,12 +122,12 @@ public class MessageImplGenerator {
 
             if (isMarshallable && 
!marshallableTypesBlackList.canBeMarshallable(getterType)) {
                 String error = String.format(
-                        "\"%s\" field is marked as @Marshallable but this type 
is supported by native serialization, "
-                                + "remove this annotation from the field",
+                        "\"%s\" field is marked as @Marshallable but this type 
is either directly supported by native serialization "
+                                + "or is prohibited by a blacklist, remove 
this annotation from the field",
                         getterName
                 );
 
-                throw new ProcessingException(error);
+                throw new ProcessingException(error, null, getter);
             }
 
             FieldSpec.Builder fieldBuilder = 
FieldSpec.builder(getterReturnType, getterName)
diff --git 
a/modules/network-annotation-processor/src/main/resources/marshallable.blacklist
 
b/modules/network-annotation-processor/src/main/resources/marshallable.blacklist
new file mode 100644
index 0000000000..93280f77dd
--- /dev/null
+++ 
b/modules/network-annotation-processor/src/main/resources/marshallable.blacklist
@@ -0,0 +1,4 @@
+org.apache.ignite.internal.schema.BinaryRow
+org.apache.ignite.internal.schema.BinaryTuple
+org.apache.ignite.internal.schema.BinaryTuplePrefix
+org.apache.ignite.internal.hlc.HybridTimestamp
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/processor/MarshallableBlacklistTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/processor/MarshallableBlacklistTest.java
index a2f0e992d1..3c6783f581 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/processor/MarshallableBlacklistTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/processor/MarshallableBlacklistTest.java
@@ -27,11 +27,14 @@ import com.google.testing.compile.Compiler;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Stream;
 import javax.tools.JavaFileObject;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import 
org.apache.ignite.internal.network.processor.messages.MarshallableTypesBlackList;
+import org.apache.ignite.network.NetworkMessage;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -75,6 +78,7 @@ public class MarshallableBlacklistTest {
         return List.of(
                 "package org.apache.ignite.internal.network.processor;",
 
+                "import 
org.apache.ignite.internal.network.message.ScaleCubeMessage;",
                 "import org.apache.ignite.network.NetworkMessage;",
                 "import org.apache.ignite.network.annotations.Transferable;",
                 "import org.apache.ignite.network.annotations.Marshallable;",
@@ -102,7 +106,7 @@ public class MarshallableBlacklistTest {
         Compilation compilation = compile(marshallableFieldSourceCode(type));
 
         assertThat(compilation).hadErrorContaining(
-                "\"foo\" field is marked as @Marshallable but this type is 
supported by native serialization"
+                "\"foo\" field is marked as @Marshallable but this type is 
either directly supported by native serialization"
         );
     }
 
@@ -112,7 +116,7 @@ public class MarshallableBlacklistTest {
         Compilation compilation = compile(marshallableArraySourceCode(type));
 
         assertThat(compilation).hadErrorContaining(
-                "\"foo\" field is marked as @Marshallable but this type is 
supported by native serialization"
+                "\"foo\" field is marked as @Marshallable but this type is 
either directly supported by native serialization"
         );
     }
 
@@ -122,7 +126,7 @@ public class MarshallableBlacklistTest {
         Compilation compilation = 
compile(marshallableCollectionSourceCode(collectionType, type));
 
         assertThat(compilation).hadErrorContaining(
-                "\"foo\" field is marked as @Marshallable but this type is 
supported by native serialization"
+                "\"foo\" field is marked as @Marshallable but this type is 
either directly supported by native serialization"
         );
     }
 
@@ -145,7 +149,7 @@ public class MarshallableBlacklistTest {
         Compilation compilation = 
compile(marshallableMapSourceCode(Integer.class, String.class));
 
         assertThat(compilation).hadErrorContaining(
-                "\"foo\" field is marked as @Marshallable but this type is 
supported by native serialization"
+                "\"foo\" field is marked as @Marshallable but this type is 
either directly supported by native serialization"
         );
     }
 
@@ -154,7 +158,7 @@ public class MarshallableBlacklistTest {
         Compilation compilation = 
compile(marshallableNestedMapSourceCode(Integer.class, String.class));
 
         assertThat(compilation).hadErrorContaining(
-                "\"foo\" field is marked as @Marshallable but this type is 
supported by native serialization"
+                "\"foo\" field is marked as @Marshallable but this type is 
either directly supported by native serialization"
         );
     }
 
@@ -165,6 +169,28 @@ public class MarshallableBlacklistTest {
         assertThat(compilation).succeededWithoutWarnings();
     }
 
+    /**
+     * Tests that compilation fails if message's field is both {@link 
NetworkMessage} and marked
+     * as {@link org.apache.ignite.network.annotations.Marshallable}.
+     */
+    @Test
+    void testMessageWithMarshallableMessage() {
+        Compilation compilation = 
compile(marshallableFieldSourceCode(ScaleCubeMessage.class));
+
+        assertThat(compilation).hadErrorContaining(
+                "\"foo\" field is marked as @Marshallable but this type is 
either directly supported by native serialization"
+        );
+    }
+
+    @Test
+    void testMessageWithBlockedByFileType() {
+        Compilation compilation = 
compile(marshallableFieldSourceCode(Random.class));
+
+        assertThat(compilation).hadErrorContaining(
+                "\"foo\" field is marked as @Marshallable but this type is 
either directly supported by native serialization"
+        );
+    }
+
     private Compilation compile(List<String> code) {
         return compiler.compile(
                 
forSourceLines("org.apache.ignite.internal.network.processor.TestMessage", 
code),
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/processor/TransferableObjectProcessorTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/processor/TransferableObjectProcessorTest.java
index 56c58442dc..ec119c6ab5 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/processor/TransferableObjectProcessorTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/processor/TransferableObjectProcessorTest.java
@@ -249,19 +249,6 @@ public class TransferableObjectProcessorTest {
         );
     }
 
-    /**
-     * Tests that compilation fails if message's field is both {@link 
NetworkMessage} and marked
-     * as {@link org.apache.ignite.network.annotations.Marshallable}.
-     */
-    @Test
-    void testFieldBothNetworkMessageAndMarkedMarshallable() {
-        Compilation compilation = 
compile("MessageWithMarshallableNetworkMessageField");
-
-        assertThat(compilation).hadErrorContaining(
-                "\"msgField\" field is marked as @Marshallable but this type 
is supported by native serialization"
-        );
-    }
-
     /**
      * Compiles the given network message.
      */
diff --git a/modules/network/src/test/resources/marshallable.blacklist 
b/modules/network/src/test/resources/marshallable.blacklist
new file mode 100644
index 0000000000..c33559224b
--- /dev/null
+++ b/modules/network/src/test/resources/marshallable.blacklist
@@ -0,0 +1 @@
+java.util.Random
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
index ef009b0001..87ab398299 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
@@ -17,8 +17,9 @@
 
 package org.apache.ignite.internal.placementdriver.message;
 
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -26,11 +27,17 @@ import org.apache.ignite.network.annotations.Transferable;
  */
 @Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE)
 public interface LeaseGrantedMessage extends PlacementDriverReplicaMessage {
-    @Marshallable
-    HybridTimestamp leaseStartTime();
+    long leaseStartTimeLong();
+
+    default HybridTimestamp leaseStartTime() {
+        return hybridTimestamp(leaseStartTimeLong());
+    }
+
+    long leaseExpirationTimeLong();
 
-    @Marshallable
-    HybridTimestamp leaseExpirationTime();
+    default HybridTimestamp leaseExpirationTime() {
+        return hybridTimestamp(leaseExpirationTimeLong());
+    }
 
     boolean force();
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index 70fdd0e3d7..2d87ab0eae 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -76,8 +76,8 @@ public class LeaseNegotiator {
                         lease.getLeaseholder(),
                         PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessage()
                                 .groupId(groupId)
-                                .leaseStartTime(lease.getStartTime())
-                                .leaseExpirationTime(lease.getExpirationTime())
+                                
.leaseStartTimeLong(lease.getStartTime().longValue())
+                                
.leaseExpirationTimeLong(lease.getExpirationTime().longValue())
                                 .force(force)
                                 .build(),
                         leaseInterval)
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
index 5fcc8602ff..d5ffb24812 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
@@ -145,8 +145,8 @@ public class PlacementDriverReplicaSideTest {
     ) {
         PlacementDriverReplicaMessage msg = MSG_FACTORY.leaseGrantedMessage()
                 .groupId(GRP_ID)
-                .leaseStartTime(leaseStartTime)
-                .leaseExpirationTime(leaseExpirationTime)
+                .leaseStartTimeLong(leaseStartTime.longValue())
+                .leaseExpirationTimeLong(leaseExpirationTime.longValue())
                 .force(force)
                 .build();
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index 92f1300ce2..89617b514c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
@@ -356,7 +357,7 @@ public class IgniteTableImpl extends AbstractTable 
implements IgniteTable, Updat
             ReplicaRequest request = 
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(partGroupId)
                     .commitPartitionId(commitPartitionId)
-                    .binaryRows(partToRows.getValue())
+                    
.binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
                     .transactionId(txAttributes.id())
                     .term(nodeWithTerm.term())
                     .requestType(RequestType.RW_UPSERT_ALL)
@@ -369,6 +370,16 @@ public class IgniteTableImpl extends AbstractTable 
implements IgniteTable, Updat
         return CompletableFuture.allOf(futures);
     }
 
+    private static List<ByteBuffer> serializeBinaryRows(Collection<BinaryRow> 
rows) {
+        var result = new ArrayList<ByteBuffer>(rows.size());
+
+        for (BinaryRow row : rows) {
+            result.add(row.byteBuffer());
+        }
+
+        return result;
+    }
+
     /** {@inheritDoc} */
     @Override
     public <RowT> CompletableFuture<?> insertAll(
@@ -401,7 +412,7 @@ public class IgniteTableImpl extends AbstractTable 
implements IgniteTable, Updat
             ReplicaRequest request = 
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(partGroupId)
                     .commitPartitionId(commitPartitionId)
-                    .binaryRows(partToRows.getValue())
+                    
.binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
                     .transactionId(txAttributes.id())
                     .term(nodeWithTerm.term())
                     .requestType(RequestType.RW_INSERT_ALL)
@@ -464,7 +475,7 @@ public class IgniteTableImpl extends AbstractTable 
implements IgniteTable, Updat
             ReplicaRequest request = 
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(partGroupId)
                     .commitPartitionId(commitPartitionId)
-                    .binaryRows(partToRows.getValue())
+                    
.binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
                     .transactionId(txAttributes.id())
                     .term(nodeWithTerm.term())
                     .requestType(RequestType.RW_DELETE_ALL)
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 6c85625e8e..f92dc3e9be 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -47,7 +48,6 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
@@ -139,7 +139,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                 .transactionId(TestTransactionIds.newTransactionId())
                 .commitPartitionId(tablePartitionId)
                 .timestampLong(clock.nowLong())
-                .binaryRow(createKeyValueRow(1L, 1L))
+                .binaryRowBytes(createKeyValueRow(1L, 1L))
                 .requestType(RequestType.RW_GET)
                 .build();
 
@@ -181,7 +181,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                 .transactionId(TestTransactionIds.newTransactionId())
                 .commitPartitionId(tablePartitionId)
                 .timestampLong(clock.nowLong())
-                .binaryRow(createKeyValueRow(1L, 1L))
+                .binaryRowBytes(createKeyValueRow(1L, 1L))
                 .requestType(RequestType.RW_GET)
                 .build();
 
@@ -207,12 +207,12 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
         assertTrue(e1.getCause() instanceof ReplicaUnavailableException, 
e1.toString());
     }
 
-    private static Row createKeyValueRow(long id, long value) {
+    private static ByteBuffer createKeyValueRow(long id, long value) {
         RowAssembler rowBuilder = new RowAssembler(SCHEMA);
 
         rowBuilder.appendLong(id);
         rowBuilder.appendLong(value);
 
-        return new Row(SCHEMA, rowBuilder.build());
+        return rowBuilder.build().byteBuffer();
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 88373fd5e5..4ccb9ab6e9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
@@ -137,6 +138,11 @@ public interface TableMessageGroup {
      */
     short SNAPSHOT_TX_DATA_RESPONSE = 16;
 
+    /**
+     * Message type for {@link BinaryTupleMessage}.
+     */
+    short BINARY_TUPLE = 17;
+
     /**
      * Message types for Table module RAFT commands.
      */
diff --git 
a/modules/network/src/test/resources/org/apache/ignite/internal/network/processor/MessageWithMarshallableNetworkMessageField.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryTupleMessage.java
similarity index 54%
rename from 
modules/network/src/test/resources/org/apache/ignite/internal/network/processor/MessageWithMarshallableNetworkMessageField.java
rename to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryTupleMessage.java
index 07f49676b0..a9856ccb09 100644
--- 
a/modules/network/src/test/resources/org/apache/ignite/internal/network/processor/MessageWithMarshallableNetworkMessageField.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryTupleMessage.java
@@ -15,15 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network.processor;
+package org.apache.ignite.internal.table.distributed.replication.request;
 
-import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.annotations.Transferable;
 
-@Transferable(1)
-public interface MessageWithMarshallableNetworkMessageField extends 
NetworkMessage {
-    @Marshallable
-    ScaleCubeMessage msgField();
+/**
+ * Message for transferring a {@link BinaryTuple} or a {@link 
BinaryTuplePrefix}.
+ */
+@Transferable(TableMessageGroup.BINARY_TUPLE)
+public interface BinaryTupleMessage extends NetworkMessage {
+    ByteBuffer tuple();
+
+    int elementCount();
+
+    default BinaryTuple asBinaryTuple() {
+        return new BinaryTuple(elementCount(), tuple());
+    }
+
+    default BinaryTuplePrefix asBinaryTuplePrefix() {
+        return new BinaryTuplePrefix(elementCount(), tuple());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
index 85a3e3724f..5b7c39b720 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
@@ -27,8 +30,22 @@ import org.apache.ignite.network.annotations.Marshallable;
  * Multiple row replica request.
  */
 public interface MultipleRowReplicaRequest extends ReplicaRequest {
-    @Marshallable
-    Collection<BinaryRow> binaryRows();
+    Collection<ByteBuffer> binaryRowsBytes();
+
+    /**
+     * Deserializes binary row byte buffers into binary rows.
+     */
+    default Collection<BinaryRow> binaryRows() {
+        Collection<ByteBuffer> binaryRowsBytes = binaryRowsBytes();
+
+        var result = new ArrayList<BinaryRow>(binaryRowsBytes.size());
+
+        for (ByteBuffer buffer : binaryRowsBytes) {
+            result.add(new ByteBufferRow(buffer));
+        }
+
+        return result;
+    }
 
     @Marshallable
     RequestType requestType();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyReplicaRequest.java
index 5a61b2e23e..d30e6fe243 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyReplicaRequest.java
@@ -19,12 +19,14 @@ package 
org.apache.ignite.internal.table.distributed.replication.request;
 
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Read only replica request.
  */
 public interface ReadOnlyReplicaRequest extends ReplicaRequest {
-    @Marshallable
-    HybridTimestamp readTimestamp();
+    long readTimestampLong();
+
+    default HybridTimestamp readTimestamp() {
+        return HybridTimestamp.hybridTimestamp(readTimestampLong());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
index a0a8ebe030..32876b4d4c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
@@ -19,10 +19,7 @@ package 
org.apache.ignite.internal.table.distributed.replication.request;
 
 import java.util.BitSet;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -46,8 +43,7 @@ public interface ScanRetrieveBatchReplicaRequest extends 
ReplicaRequest {
      * @return Key to search.
      */
     @Nullable
-    @Marshallable
-    BinaryTuple exactKey();
+    BinaryTupleMessage exactKey();
 
     /**
      * Gets a lower bound to choose entries from {@link SortedIndexStorage}. 
Exclusivity is controlled by a {@link
@@ -56,8 +52,7 @@ public interface ScanRetrieveBatchReplicaRequest extends 
ReplicaRequest {
      * @return lower bound.
      */
     @Nullable
-    @Marshallable
-    BinaryTuplePrefix lowerBound();
+    BinaryTupleMessage lowerBoundPrefix();
 
     /**
      * Gets an upper bound to choose entries from {@link SortedIndexStorage}. 
Upper bound. Exclusivity is controlled by a {@link
@@ -66,8 +61,7 @@ public interface ScanRetrieveBatchReplicaRequest extends 
ReplicaRequest {
      * @return upper bound.
      */
     @Nullable
-    @Marshallable
-    BinaryTuplePrefix upperBound();
+    BinaryTupleMessage upperBoundPrefix();
 
     /**
      * Gets control flags for {@link SortedIndexStorage}. {@link 
SortedIndexStorage#GREATER} | {@link SortedIndexStorage#LESS} by default.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
index 4fef3610a1..98a4656483 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
+import java.nio.ByteBuffer;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
@@ -26,8 +28,11 @@ import org.apache.ignite.network.annotations.Marshallable;
  * Single-row replica request.
  */
 public interface SingleRowReplicaRequest extends ReplicaRequest {
-    @Marshallable
-    BinaryRow binaryRow();
+    ByteBuffer binaryRowBytes();
+
+    default BinaryRow binaryRow() {
+        return new ByteBufferRow(binaryRowBytes());
+    }
 
     @Marshallable
     RequestType requestType();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
index 7067dc2f35..d088c702d3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
+import java.nio.ByteBuffer;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
@@ -26,11 +28,17 @@ import org.apache.ignite.network.annotations.Marshallable;
  * Dual row replica request.
  */
 public interface SwapRowReplicaRequest extends ReplicaRequest {
-    @Marshallable
-    BinaryRow binaryRow();
+    ByteBuffer binaryRowBytes();
 
-    @Marshallable
-    BinaryRow oldBinaryRow();
+    default BinaryRow binaryRow() {
+        return new ByteBufferRow(binaryRowBytes());
+    }
+
+    ByteBuffer oldBinaryRowBytes();
+
+    default BinaryRow oldBinaryRow() {
+        return new ByteBufferRow(oldBinaryRowBytes());
+    }
 
     @Marshallable
     RequestType requestType();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b5533cccfd..1697b4725b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -108,6 +108,7 @@ import 
org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import 
org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder;
 import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
@@ -434,7 +435,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             }
 
             if (request.exactKey() != null) {
-                assert request.lowerBound() == null && request.upperBound() == 
null : "Index lookup doesn't allow bounds.";
+                assert request.lowerBoundPrefix() == null && 
request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds.";
 
                 return safeReadFuture.thenCompose(unused -> 
lookupIndex(request, indexStorage));
             }
@@ -668,7 +669,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             }
 
             if (request.exactKey() != null) {
-                assert request.lowerBound() == null && request.upperBound() == 
null : "Index lookup doesn't allow bounds.";
+                assert request.lowerBoundPrefix() == null && 
request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds.";
 
                 return lookupIndex(request, indexStorage.storage());
             }
@@ -731,7 +732,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         IgniteUuid cursorId = new IgniteUuid(request.transactionId(), 
request.scanId());
 
-        BinaryTuple key = request.exactKey();
+        BinaryTuple key = request.exactKey().asBinaryTuple();
 
         Cursor<RowId> cursor = (Cursor<RowId>) 
cursors.computeIfAbsent(cursorId,
                 id -> indexStorage.get(key));
@@ -755,7 +756,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         Integer indexId = request.indexToUse();
 
-        BinaryTuple exactKey = request.exactKey();
+        BinaryTuple exactKey = request.exactKey().asBinaryTuple();
 
         return lockManager.acquire(txId, new LockKey(indexId), 
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
             return lockManager.acquire(txId, new LockKey(tableId()), 
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
@@ -792,8 +793,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         Integer indexId = request.indexToUse();
 
-        BinaryTuplePrefix lowerBound = request.lowerBound();
-        BinaryTuplePrefix upperBound = request.upperBound();
+        BinaryTupleMessage lowerBoundMessage = request.lowerBoundPrefix();
+        BinaryTupleMessage upperBoundMessage = request.upperBoundPrefix();
+
+        BinaryTuplePrefix lowerBound = lowerBoundMessage == null ? null : 
lowerBoundMessage.asBinaryTuplePrefix();
+        BinaryTuplePrefix upperBound = upperBoundMessage == null ? null : 
upperBoundMessage.asBinaryTuplePrefix();
 
         int flags = request.flags();
 
@@ -859,8 +863,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
 
-        BinaryTuplePrefix lowerBound = request.lowerBound();
-        BinaryTuplePrefix upperBound = request.upperBound();
+        BinaryTupleMessage lowerBoundMessage = request.lowerBoundPrefix();
+        BinaryTupleMessage upperBoundMessage = request.upperBoundPrefix();
+
+        BinaryTuplePrefix lowerBound = lowerBoundMessage == null ? null : 
lowerBoundMessage.asBinaryTuplePrefix();
+        BinaryTuplePrefix upperBound = upperBoundMessage == null ? null : 
upperBoundMessage.asBinaryTuplePrefix();
 
         int flags = request.flags();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 2c1720379c..ef2914eb6c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -31,6 +31,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.net.ConnectException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
@@ -52,6 +53,7 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.raft.Peer;
@@ -70,6 +72,7 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
@@ -383,9 +386,9 @@ public class InternalTableImpl implements InternalTable {
                 .transactionId(tx.id())
                 .scanId(scanId)
                 .indexToUse(indexId)
-                .exactKey(exactKey)
-                .lowerBound(lowerBound)
-                .upperBound(upperBound)
+                .exactKey(binaryTupleMessage(exactKey))
+                .lowerBoundPrefix(binaryTupleMessage(lowerBound))
+                .upperBoundPrefix(binaryTupleMessage(upperBound))
                 .flags(flags)
                 .columnsToInclude(columnsToInclude)
                 .batchSize(batchSize);
@@ -407,6 +410,17 @@ public class InternalTableImpl implements InternalTable {
         return postEnlist(fut, false, tx);
     }
 
+    private @Nullable BinaryTupleMessage binaryTupleMessage(@Nullable 
BinaryTupleReader binaryTuple) {
+        if (binaryTuple == null) {
+            return null;
+        }
+
+        return tableMessagesFactory.binaryTupleMessage()
+                .tuple(binaryTuple.byteBuffer())
+                .elementCount(binaryTuple.elementCount())
+                .build();
+    }
+
     /**
      * Partition enlisting with retrying.
      *
@@ -513,7 +527,7 @@ public class InternalTableImpl implements InternalTable {
                     tx,
                     (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                             .groupId(groupId)
-                            .binaryRow(keyRow)
+                            .binaryRowBytes(keyRow.byteBuffer())
                             .commitPartitionId(commitPart)
                             .transactionId(txo.id())
                             .term(term)
@@ -535,9 +549,9 @@ public class InternalTableImpl implements InternalTable {
 
         return replicaSvc.invoke(recipientNode, 
tableMessagesFactory.readOnlySingleRowReplicaRequest()
                 .groupId(partGroupId)
-                .binaryRow(keyRow)
+                .binaryRowBytes(keyRow.byteBuffer())
                 .requestType(RequestType.RO_GET)
-                .readTimestamp(readTimestamp)
+                .readTimestampLong(readTimestamp.longValue())
                 .build()
         );
     }
@@ -560,7 +574,7 @@ public class InternalTableImpl implements InternalTable {
                     tx,
                     (commitPart, keyRows0, txo, groupId, term) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
                             .groupId(groupId)
-                            .binaryRows(keyRows0)
+                            .binaryRowsBytes(serializeBinaryRows(keyRows0))
                             .commitPartitionId(commitPart)
                             .transactionId(txo.id())
                             .term(term)
@@ -589,9 +603,9 @@ public class InternalTableImpl implements InternalTable {
 
             CompletableFuture<Object> fut = replicaSvc.invoke(recipientNode, 
tableMessagesFactory.readOnlyMultiRowReplicaRequest()
                     .groupId(partGroupId)
-                    .binaryRows(partToRows.getValue())
+                    
.binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
                     .requestType(RequestType.RO_GET_ALL)
-                    .readTimestamp(readTimestamp)
+                    .readTimestampLong(readTimestamp.longValue())
                     .build()
             );
 
@@ -601,6 +615,16 @@ public class InternalTableImpl implements InternalTable {
         return collectMultiRowsResponses(futures);
     }
 
+    private static List<ByteBuffer> serializeBinaryRows(Collection<BinaryRow> 
rows) {
+        var result = new ArrayList<ByteBuffer>(rows.size());
+
+        for (BinaryRow row : rows) {
+            result.add(row.byteBuffer());
+        }
+
+        return result;
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsert(BinaryRowEx row, InternalTransaction 
tx) {
@@ -610,7 +634,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRow(row)
+                        .binaryRowBytes(row.byteBuffer())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_UPSERT)
@@ -627,7 +651,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, keyRows0, txo, groupId, term) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRows(keyRows0)
+                        .binaryRowsBytes(serializeBinaryRows(keyRows0))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_UPSERT_ALL)
@@ -645,7 +669,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRow(row)
+                        .binaryRowBytes(row.byteBuffer())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_UPSERT)
@@ -663,7 +687,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRow(row)
+                        .binaryRowBytes(row.byteBuffer())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_INSERT)
@@ -681,7 +705,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, keyRows0, txo, groupId, term) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRows(keyRows0)
+                        .binaryRowsBytes(serializeBinaryRows(keyRows0))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_INSERT_ALL)
@@ -699,7 +723,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRow(row)
+                        .binaryRowBytes(row.byteBuffer())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_REPLACE_IF_EXIST)
@@ -717,8 +741,8 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSwapRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .oldBinaryRow(oldRow)
-                        .binaryRow(newRow)
+                        .oldBinaryRowBytes(oldRow.byteBuffer())
+                        .binaryRowBytes(newRow.byteBuffer())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_REPLACE)
@@ -736,7 +760,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRow(row)
+                        .binaryRowBytes(row.byteBuffer())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_REPLACE)
@@ -754,7 +778,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRow(keyRow)
+                        .binaryRowBytes(keyRow.byteBuffer())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE)
@@ -772,7 +796,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRow(oldRow)
+                        .binaryRowBytes(oldRow.byteBuffer())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE_EXACT)
@@ -790,7 +814,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRow(row)
+                        .binaryRowBytes(row.byteBuffer())
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_DELETE)
@@ -808,7 +832,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, keyRows0, txo, groupId, term) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRows(keyRows0)
+                        .binaryRowsBytes(serializeBinaryRows(keyRows0))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE_ALL)
@@ -829,7 +853,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, keyRows0, txo, groupId, term) -> 
tableMessagesFactory.readWriteMultiRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRows(keyRows0)
+                        .binaryRowsBytes(serializeBinaryRows(keyRows0))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE_EXACT_ALL)
@@ -897,15 +921,15 @@ public class InternalTableImpl implements InternalTable {
 
                     ReadOnlyScanRetrieveBatchReplicaRequest request = 
tableMessagesFactory.readOnlyScanRetrieveBatchReplicaRequest()
                             .groupId(partGroupId)
-                            .readTimestamp(readTimestamp)
+                            .readTimestampLong(readTimestamp.longValue())
                             // TODO: IGNITE-17666 Close cursor tx finish.
                             .transactionId(txId)
                             .scanId(scanId)
                             .batchSize(batchSize)
                             .indexToUse(indexId)
-                            .exactKey(exactKey)
-                            .lowerBound(lowerBound)
-                            .upperBound(upperBound)
+                            .exactKey(binaryTupleMessage(exactKey))
+                            .lowerBoundPrefix(binaryTupleMessage(lowerBound))
+                            .upperBoundPrefix(binaryTupleMessage(upperBound))
                             .flags(flags)
                             .columnsToInclude(columnsToInclude)
                             .build();
@@ -1010,9 +1034,9 @@ public class InternalTableImpl implements InternalTable {
                             .transactionId(txId)
                             .scanId(scanId)
                             .indexToUse(indexId)
-                            .exactKey(exactKey)
-                            .lowerBound(lowerBound)
-                            .upperBound(upperBound)
+                            .exactKey(binaryTupleMessage(exactKey))
+                            .lowerBoundPrefix(binaryTupleMessage(lowerBound))
+                            .upperBoundPrefix(binaryTupleMessage(upperBound))
                             .flags(flags)
                             .columnsToInclude(columnsToInclude)
                             .batchSize(batchSize)
@@ -1224,7 +1248,7 @@ public class InternalTableImpl implements InternalTable {
      * Enlists a partition.
      *
      * @param partId Partition id.
-     * @param tx     The transaction.
+     * @param tx The transaction.
      * @return The enlist future (then will a leader become known).
      */
     protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int 
partId, InternalTransaction tx) {
@@ -1268,7 +1292,8 @@ public class InternalTableImpl implements InternalTable {
          * The constructor.
          *
          * @param retrieveBatch Closure that gets a new batch from the remote 
replica.
-         * @param onClose The closure will be applied when {@link 
Subscription#cancel} is invoked directly or the cursor is finished.
+         * @param onClose The closure will be applied when {@link 
Subscription#cancel} is invoked directly or the cursor is
+         *         finished.
          */
         PartitionScanPublisher(
                 BiFunction<Long, Integer, 
CompletableFuture<Collection<BinaryRow>>> retrieveBatch,
@@ -1429,6 +1454,7 @@ public class InternalTableImpl implements InternalTable {
     }
 
     // TODO: IGNITE-17963 Use smarter logic for recipient node evaluation.
+
     /**
      * Evaluated cluster node for read-only request processing.
      *
@@ -1452,9 +1478,9 @@ public class InternalTableImpl implements InternalTable {
     }
 
     /**
-     * Casts any exception type to a client exception, wherein {@link 
ReplicationException} and {@link LockException} are wrapped
-     * to {@link TransactionException}, but another exceptions are wrapped to 
a common exception.
-     * The method does not wrap an exception if the exception already inherits 
type of {@link RuntimeException}.
+     * Casts any exception type to a client exception, wherein {@link 
ReplicationException} and {@link LockException} are wrapped to
+     * {@link TransactionException}, but another exceptions are wrapped to a 
common exception. The method does not wrap an exception if the
+     * exception already inherits type of {@link RuntimeException}.
      *
      * @param e An instance exception to cast to client side one.
      * @return {@link IgniteException} An instance of client side exception.
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index d679938837..ff830f9d9b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.replication;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.allOf;
@@ -252,7 +253,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 .term(1L)
                 .commitPartitionId(PARTITION_ID)
                 .transactionId(TRANSACTION_ID)
-                .binaryRow(testBinaryRow)
+                .binaryRowBytes(testBinaryRow.byteBuffer())
                 .requestType(arg.type)
                 .build());
 
@@ -298,7 +299,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 .term(1L)
                 .commitPartitionId(PARTITION_ID)
                 .transactionId(TRANSACTION_ID)
-                .binaryRows(rows)
+                
.binaryRowsBytes(rows.stream().map(BinaryRow::byteBuffer).collect(toList()))
                 .requestType(arg.type)
                 .build());
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 44ae746ff6..cf033a7a8d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.distributed.replication;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -79,7 +80,6 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -116,6 +116,7 @@ import 
org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
 import 
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
@@ -512,8 +513,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
-                .readTimestamp(clock.now())
-                .binaryRow(testBinaryKey)
+                .readTimestampLong(clock.nowLong())
+                .binaryRowBytes(testBinaryKey.byteBuffer())
                 .requestType(RequestType.RO_GET)
                 .build());
 
@@ -535,8 +536,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
-                .readTimestamp(clock.now())
-                .binaryRow(testBinaryKey)
+                .readTimestampLong(clock.nowLong())
+                .binaryRowBytes(testBinaryKey.byteBuffer())
                 .requestType(RequestType.RO_GET)
                 .build());
 
@@ -558,8 +559,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
-                .readTimestamp(clock.now())
-                .binaryRow(testBinaryKey)
+                .readTimestampLong(clock.nowLong())
+                .binaryRowBytes(testBinaryKey.byteBuffer())
                 .requestType(RequestType.RO_GET)
                 .build());
 
@@ -580,8 +581,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
-                .readTimestamp(clock.now())
-                .binaryRow(testBinaryKey)
+                .readTimestampLong(clock.nowLong())
+                .binaryRowBytes(testBinaryKey.byteBuffer())
                 .requestType(RequestType.RO_GET)
                 .build());
 
@@ -603,8 +604,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
-                .readTimestamp(clock.now())
-                .binaryRow(testBinaryKey)
+                .readTimestampLong(clock.nowLong())
+                .binaryRowBytes(testBinaryKey.byteBuffer())
                 .requestType(RequestType.RO_GET)
                 .build());
 
@@ -674,8 +675,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .term(1L)
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
-                .lowerBound(toIndexBound(1))
-                .upperBound(toIndexBound(3))
+                .lowerBoundPrefix(toIndexBound(1))
+                .upperBoundPrefix(toIndexBound(3))
                 .flags(SortedIndexStorage.LESS_OR_EQUAL)
                 .batchSize(5)
                 .build());
@@ -693,7 +694,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .term(1L)
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
-                .lowerBound(toIndexBound(5))
+                .lowerBoundPrefix(toIndexBound(5))
                 .batchSize(5)
                 .build());
 
@@ -745,7 +746,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
                 .transactionId(scanTxId)
-                .readTimestamp(clock.now())
+                .readTimestampLong(clock.nowLong())
                 .scanId(1L)
                 .indexToUse(sortedIndexId)
                 .batchSize(4)
@@ -760,7 +761,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
                 .transactionId(scanTxId)
-                .readTimestamp(clock.now())
+                .readTimestampLong(clock.nowLong())
                 .scanId(1L)
                 .indexToUse(sortedIndexId)
                 .batchSize(4)
@@ -775,11 +776,11 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
                 .transactionId(TestTransactionIds.newTransactionId())
-                .readTimestamp(clock.now())
+                .readTimestampLong(clock.nowLong())
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
-                .lowerBound(toIndexBound(1))
-                .upperBound(toIndexBound(3))
+                .lowerBoundPrefix(toIndexBound(1))
+                .upperBoundPrefix(toIndexBound(3))
                 .flags(SortedIndexStorage.LESS_OR_EQUAL)
                 .batchSize(5)
                 .build());
@@ -793,10 +794,10 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
                 .transactionId(TestTransactionIds.newTransactionId())
-                .readTimestamp(clock.now())
+                .readTimestampLong(clock.nowLong())
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
-                .lowerBound(toIndexBound(5))
+                .lowerBoundPrefix(toIndexBound(5))
                 .batchSize(5)
                 .build());
 
@@ -809,7 +810,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
                 .transactionId(TestTransactionIds.newTransactionId())
-                .readTimestamp(clock.now())
+                .readTimestampLong(clock.nowLong())
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
                 .exactKey(toIndexKey(0))
@@ -847,7 +848,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
                 .transactionId(scanTxId)
-                .readTimestamp(clock.now())
+                .readTimestampLong(clock.nowLong())
                 .scanId(1L)
                 .indexToUse(hashIndexId)
                 .exactKey(toIndexKey(0))
@@ -863,7 +864,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
                 .transactionId(scanTxId)
-                .readTimestamp(clock.now())
+                .readTimestampLong(clock.nowLong())
                 .scanId(1L)
                 .indexToUse(hashIndexId)
                 .exactKey(toIndexKey(0))
@@ -879,7 +880,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
                 .transactionId(TestTransactionIds.newTransactionId())
-                .readTimestamp(clock.now())
+                .readTimestampLong(clock.nowLong())
                 .scanId(2L)
                 .indexToUse(hashIndexId)
                 .exactKey(toIndexKey(5))
@@ -895,7 +896,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
                 .transactionId(TestTransactionIds.newTransactionId())
-                .readTimestamp(clock.now())
+                .readTimestampLong(clock.nowLong())
                 .scanId(2L)
                 .indexToUse(hashIndexId)
                 .exactKey(toIndexKey(1))
@@ -985,7 +986,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .groupId(grpId)
                 .transactionId(txId)
                 .requestType(requestType)
-                .binaryRow(binaryRow)
+                .binaryRowBytes(binaryRow.byteBuffer())
                 .term(1L)
                 .commitPartitionId(commitPartitionId())
                 .build()
@@ -1001,7 +1002,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .groupId(grpId)
                 .transactionId(txId)
                 .requestType(requestType)
-                .binaryRows(binaryRows)
+                
.binaryRowsBytes(binaryRows.stream().map(BinaryRow::byteBuffer).collect(toList()))
                 .term(1L)
                 .commitPartitionId(commitPartitionId())
                 .build()
@@ -1022,7 +1023,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                             .groupId(grpId)
                             .transactionId(txId)
                             .requestType(RequestType.RW_INSERT)
-                            .binaryRow(binaryRow)
+                            .binaryRowBytes(binaryRow.byteBuffer())
                             .term(1L)
                             .commitPartitionId(commitPartitionId())
                             .build();
@@ -1049,7 +1050,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                             .groupId(grpId)
                             .transactionId(txId)
                             .requestType(RequestType.RW_UPSERT_ALL)
-                            .binaryRows(asList(binaryRow0, binaryRow1))
+                            .binaryRowsBytes(asList(binaryRow0.byteBuffer(), 
binaryRow1.byteBuffer()))
                             .term(1L)
                             .commitPartitionId(commitPartitionId())
                             .build();
@@ -1213,7 +1214,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             Set<BinaryRow> expected = committed
                     ? (upsertAfterDelete ? allRows : allRowsButModified)
                     : (insertFirst ? allRows : Set.of());
-            Set<BinaryRow> res = new HashSet<>(roGetAll(allRows, clock.now()));
+            Set<BinaryRow> res = new HashSet<>(roGetAll(allRows, 
clock.nowLong()));
 
             assertEquals(expected.size(), res.size());
             for (BinaryRow e : expected) {
@@ -1221,7 +1222,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 res.contains(e);
             }
         } else {
-            BinaryRow res = roGet(br1, clock.now());
+            BinaryRow res = roGet(br1, clock.nowLong());
             BinaryRow expected = committed
                     ? (upsertAfterDelete ? br1 : null)
                     : (insertFirst ? br1 : null);
@@ -1465,8 +1466,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .groupId(grpId)
                         .transactionId(targetTxId)
                         .requestType(RequestType.RW_REPLACE)
-                        .oldBinaryRow(binaryRow(key, new TestValue(1, "v1"), 
kvMarshaller))
-                        .binaryRow(binaryRow(key, new TestValue(3, "v3"), 
kvMarshaller))
+                        .oldBinaryRowBytes(binaryRow(key, new TestValue(1, 
"v1"), kvMarshaller).byteBuffer())
+                        .binaryRowBytes(binaryRow(key, new TestValue(3, "v3"), 
kvMarshaller).byteBuffer())
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
                         .build()
@@ -1528,7 +1529,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .groupId(grpId)
                 .requestType(RequestType.RW_UPSERT)
                 .transactionId(txId)
-                .binaryRow(row)
+                .binaryRowBytes(row.byteBuffer())
                 .term(1L)
                 .commitPartitionId(new TablePartitionId(tblId, partId))
                 .build()
@@ -1540,31 +1541,31 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .groupId(grpId)
                 .requestType(RequestType.RW_DELETE)
                 .transactionId(txId)
-                .binaryRow(row)
+                .binaryRowBytes(row.byteBuffer())
                 .term(1L)
                 .commitPartitionId(new TablePartitionId(tblId, partId))
                 .build()
         ).join();
     }
 
-    private BinaryRow roGet(BinaryRow row, HybridTimestamp readTimestamp) {
+    private BinaryRow roGet(BinaryRow row, long readTimestamp) {
         CompletableFuture<?> future = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
                 .requestType(RequestType.RO_GET)
-                .readTimestamp(readTimestamp)
-                .binaryRow(row)
+                .readTimestampLong(readTimestamp)
+                .binaryRowBytes(row.byteBuffer())
                 .build()
         );
 
         return (BinaryRow) future.join();
     }
 
-    private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, 
HybridTimestamp readTimestamp) {
+    private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, long 
readTimestamp) {
         CompletableFuture<?> future = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyMultiRowReplicaRequest()
                 .groupId(grpId)
                 .requestType(RequestType.RO_GET_ALL)
-                .readTimestamp(readTimestamp)
-                .binaryRows(rows)
+                .readTimestampLong(readTimestamp)
+                
.binaryRowsBytes(rows.stream().map(BinaryRow::byteBuffer).collect(toList()))
                 .build()
         );
 
@@ -1584,16 +1585,22 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         txState = TxState.COMMITED;
     }
 
-    private BinaryTuplePrefix toIndexBound(int val) {
+    private BinaryTupleMessage toIndexBound(int val) {
         ByteBuffer tuple = new BinaryTuplePrefixBuilder(1, 
1).appendInt(val).build();
 
-        return new BinaryTuplePrefix(1, tuple);
+        return TABLE_MESSAGES_FACTORY.binaryTupleMessage()
+                .tuple(tuple)
+                .elementCount(1)
+                .build();
     }
 
-    private BinaryTuple toIndexKey(int val) {
+    private BinaryTupleMessage toIndexKey(int val) {
         ByteBuffer tuple = new BinaryTupleBuilder(1, 
true).appendInt(val).build();
 
-        return new BinaryTuple(1, tuple);
+        return TABLE_MESSAGES_FACTORY.binaryTupleMessage()
+                .tuple(tuple)
+                .elementCount(1)
+                .build();
     }
 
     private BinaryRow nextBinaryKey() {

Reply via email to