This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 00181c783f IGNITE-19810 Remove excessive BinaryRow buffer copying 
(#2295)
00181c783f is described below

commit 00181c783f22e262a5fc0df488c4f64d0c9c75ee
Author: Alexander Polovtcev <alex.polovt...@gmail.com>
AuthorDate: Tue Jul 11 11:06:44 2023 +0300

    IGNITE-19810 Remove excessive BinaryRow buffer copying (#2295)
---
 .idea/inspectionProfiles/Project_Default.xml       |   1 -
 .../internal/binarytuple/BinaryTupleCommon.java    |   6 +
 .../org/apache/ignite/internal/util/ByteUtils.java |  30 ---
 .../pagememory/util/PartitionlessLinks.java        |   2 +-
 .../ignite/internal/table/ItRoReadsTest.java       |  70 +++--
 .../apache/ignite/internal/schema/BinaryRow.java   |  18 +-
 .../ignite/internal/schema/BinaryRowImpl.java      |  96 +++++++
 .../ignite/internal/schema/ByteBufferRow.java      |  25 +-
 .../org/apache/ignite/internal/schema/row/Row.java |  28 +-
 .../ignite/internal/schema/row/RowAssembler.java   |  18 +-
 .../schema/marshaller/KvMarshallerTest.java        |   6 +-
 .../storage/AbstractMvPartitionStorageTest.java    |   2 +-
 .../storage/AbstractMvTableStorageTest.java        |  12 +-
 .../internal/storage/BaseMvStoragesTest.java       |   5 +-
 .../pagememory/mv/AbortWriteInvokeClosure.java     |   2 +-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  28 +-
 .../mv/AddWriteCommittedInvokeClosure.java         |   6 +-
 .../pagememory/mv/AddWriteInvokeClosure.java       |   8 +-
 .../storage/pagememory/mv/FindRowVersion.java      |  13 +-
 .../storage/pagememory/mv/ReadRowVersion.java      |  13 +-
 .../internal/storage/pagememory/mv/RowVersion.java |  38 ++-
 .../storage/pagememory/mv/ScanVersionsCursor.java  |  10 +-
 .../storage/pagememory/mv/io/RowVersionDataIo.java |  61 +++--
 .../internal/storage/rocksdb/GarbageCollector.java |   5 +-
 .../storage/rocksdb/PartitionDataHelper.java       |  14 +
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 292 ++++++++++++---------
 .../ItAbstractInternalTableScanTest.java           |  13 +-
 .../ignite/distributed/ItTablePersistenceTest.java |  10 +-
 .../ignite/internal/table/ItColocationTest.java    |   6 +-
 .../replicator/PartitionReplicaListener.java       |  20 +-
 .../table/ColocationHashCalculationTest.java       |   5 +-
 .../internal/table/MutableRowTupleAdapterTest.java |  35 ++-
 .../replication/PartitionReplicaListenerTest.java  |  12 +-
 .../table/type/NumericTypesSerializerTest.java     |   6 +-
 34 files changed, 514 insertions(+), 402 deletions(-)

diff --git a/.idea/inspectionProfiles/Project_Default.xml 
b/.idea/inspectionProfiles/Project_Default.xml
index dc1ec81c11..d730085aba 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -1148,7 +1148,6 @@
     <inspection_tool class="UseVirtualFileEquals" enabled="false" 
level="WARNING" enabled_by_default="false" />
     <inspection_tool class="VarargParameter" enabled="false" 
level="INFORMATION" enabled_by_default="false" />
     <inspection_tool class="VariablePatternShadow" enabled="true" 
level="WARNING" enabled_by_default="true" />
-    <inspection_tool class="VolatileArrayField" enabled="true" level="WARNING" 
enabled_by_default="true" />
     <inspection_tool class="WaitCalledOnCondition" enabled="true" 
level="WARNING" enabled_by_default="true" />
     <inspection_tool class="WaitNotInLoop" enabled="true" level="WARNING" 
enabled_by_default="true" />
     <inspection_tool class="WaitNotInSynchronizedContext" enabled="false" 
level="WARNING" enabled_by_default="false" />
diff --git 
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java
 
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java
index 519737e42d..a4e09ca2e9 100644
--- 
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java
+++ 
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java
@@ -50,6 +50,12 @@ public class BinaryTupleCommon {
      */
     public static final int EQUALITY_FLAG = 1 << 4;
 
+    /**
+     * Flag used when a Binary Tuple represents a Table Row. If it is set, 
then this Row contains both a Primary Key columns and Value
+     * columns.
+     */
+    public static final int ROW_HAS_VALUE_FLAG = 1 << 5;
+
     /**
      * Calculates flags for a given size of variable-length area.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
index ba540654d1..dba531df0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -22,7 +22,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.UUID;
 import org.apache.ignite.lang.IgniteInternalException;
 
 /**
@@ -153,35 +152,6 @@ public class ByteUtils {
         return bytes;
     }
 
-    /**
-     * Converts a {@link UUID} value to a byte array in Big Endian order and 
stores it in the specified byte array.
-     *
-     * @param uuid UUID.
-     * @param bytes Bytes array to write result to.
-     * @param off Offset in the target array to write result to.
-     * @return Number of bytes overwritten in {@code bytes} array.
-     */
-    public static byte[] putUuidToBytes(UUID uuid, byte[] bytes, int off) {
-        putLongToBytes(uuid.getMostSignificantBits(), bytes, off);
-        putLongToBytes(uuid.getLeastSignificantBits(), bytes, off + 
Long.BYTES);
-
-        return bytes;
-    }
-
-    /**
-     * Constructs {@link UUID} from byte array in Big Endian order.
-     *
-     * @param bytes Array of bytes.
-     * @param off Offset in {@code bytes} array.
-     * @return UUID value.
-     */
-    public static UUID bytesToUuid(byte[] bytes, int off) {
-        long msb = bytesToLong(bytes, off);
-        long lsb = bytesToLong(bytes, off + Long.BYTES);
-
-        return new UUID(msb, lsb);
-    }
-
     /**
      * Serializes an object to byte array using native java serialization 
mechanism.
      *
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PartitionlessLinks.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PartitionlessLinks.java
index 9383021234..6e057fc0a3 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PartitionlessLinks.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PartitionlessLinks.java
@@ -74,7 +74,7 @@ public class PartitionlessLinks {
      * @param link Link or page ID.
      * @return Number of bytes written (equal to {@link 
#PARTITIONLESS_LINK_SIZE_BYTES}).
      */
-    public static long writePartitionless(long addr, long link) {
+    public static int writePartitionless(long addr, long link) {
         putShort(addr, 0, (short) tag(link));
 
         putInt(addr + Short.BYTES, 0, pageIndex(link));
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
index 0fcb2c7da7..ff2a3b83f1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
@@ -25,24 +25,20 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
-import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
-import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.InitParameters;
@@ -181,7 +177,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
 
         res = internalTable.get(keyValueRow, node.clock().now(), 
node.node()).get();
 
-        assertEquals(res.byteBuffer(), keyValueRow.byteBuffer());
+        assertRowEquals(res, keyValueRow);
     }
 
     @Test
@@ -213,7 +209,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
 
         BinaryRow res = internalTable.get(keyRow, node.clock().now(), 
node.node()).get();
 
-        assertEquals(res.byteBuffer(), keyValueRow2.byteBuffer());
+        assertRowEquals(res, keyValueRow2);
     }
 
     @Test
@@ -247,9 +243,9 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
 
         CountDownLatch latch = new CountDownLatch(1);
 
-        List<ByteBuffer> list = new ArrayList<>();
+        List<BinaryRow> list = new ArrayList<>();
 
-        res.subscribe(new Subscriber<BinaryRow>() {
+        res.subscribe(new Subscriber<>() {
             @Override
             public void onSubscribe(Subscription subscription) {
                 subscription.request(100);
@@ -257,7 +253,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
 
             @Override
             public void onNext(BinaryRow item) {
-                list.add(item.byteBuffer());
+                list.add(item);
             }
 
             @Override
@@ -274,7 +270,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
 
         assertEquals(1, list.size());
 
-        assertEquals(list.get(0), keyValueRow2.byteBuffer());
+        assertRowEquals(list.get(0), keyValueRow2);
     }
 
     @Test
@@ -302,13 +298,13 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest 
{
 
         BinaryRow res = internalTable.get(keyValueRow, node.clock().now(), 
node.node()).get();
 
-        assertEquals(res.byteBuffer(), keyValueRow.byteBuffer());
+        assertRowEquals(res, keyValueRow);
 
         tx2.commit();
 
         res = internalTable.get(keyValueRow, node.clock().now(), 
node.node()).get();
 
-        assertEquals(res.byteBuffer(), keyValueRow2.byteBuffer());
+        assertRowEquals(res, keyValueRow2);
     }
 
     @Test
@@ -321,13 +317,13 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest 
{
         Row keyValueRow2 = createKeyValueRow(2, 2, "some string row" + 2);
         Row keyValueRow3 = createKeyValueRow(3, 3, "some string row" + 3);
 
-        Set<BinaryRowEx> rowsToSearch = Set.of(keyValueRow1, keyValueRow2, 
keyValueRow3);
+        List<BinaryRowEx> rowsToSearch = List.of(keyValueRow1, keyValueRow2, 
keyValueRow3);
 
         KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
 
-        Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, 
node.clock().now(), node.node()).get();
+        List<BinaryRow> res = internalTable.getAll(rowsToSearch, 
node.clock().now(), node.node()).get();
 
-        assertEquals(res.size(), 3);
+        assertEquals(3, res.size());
 
         node.transactions().runInTransaction(txs -> {
             for (int i = 0; i < 15; i++) {
@@ -337,13 +333,11 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest 
{
 
         res = internalTable.getAll(rowsToSearch, node.clock().now(), 
node.node()).get();
 
-        assertEquals(res.size(), 3);
-
-        Set<ByteBuffer> resultKeys = 
res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet());
+        assertEquals(3, res.size());
 
-        assertTrue(resultKeys.contains(keyValueRow1.byteBuffer()));
-        assertTrue(resultKeys.contains(keyValueRow2.byteBuffer()));
-        assertTrue(resultKeys.contains(keyValueRow3.byteBuffer()));
+        for (int i = 0; i < 3; i++) {
+            assertRowEquals(res.get(i), rowsToSearch.get(i));
+        }
     }
 
     @Test
@@ -356,25 +350,23 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest 
{
         Row keyValueRow2 = createKeyValueRow(2, 2, "some string row" + 2);
         Row keyValueRow3 = createKeyValueRow(3, 3, "some string row" + 3);
 
-        Set<BinaryRowEx> rowsToSearch = Set.of(keyValueRow1, keyValueRow2, 
keyValueRow3);
+        List<BinaryRowEx> rowsToSearch = List.of(keyValueRow1, keyValueRow2, 
keyValueRow3);
 
         KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
 
-        Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, 
node.clock().now(), node.node()).get();
+        List<BinaryRow> res = internalTable.getAll(rowsToSearch, 
node.clock().now(), node.node()).get();
 
-        assertEquals(res.size(), 3);
+        assertEquals(3, res.size());
 
         populateData(node(), keyValueView, false);
 
         res = internalTable.getAll(rowsToSearch, node.clock().now(), 
node.node()).get();
 
-        assertEquals(res.size(), 3);
-
-        Set<ByteBuffer> resultKeys = 
res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet());
+        assertEquals(3, res.size());
 
-        assertTrue(resultKeys.contains(keyValueRow1.byteBuffer()));
-        assertTrue(resultKeys.contains(keyValueRow2.byteBuffer()));
-        assertTrue(resultKeys.contains(keyValueRow3.byteBuffer()));
+        for (int i = 0; i < 3; i++) {
+            assertRowEquals(res.get(i), rowsToSearch.get(i));
+        }
 
         node.transactions().runInTransaction(txs -> {
             for (int i = 0; i < 15; i++) {
@@ -388,13 +380,11 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest 
{
 
         res = internalTable.getAll(rowsToSearch, node.clock().now(), 
node.node()).get();
 
-        assertEquals(res.size(), 3);
-
-        resultKeys = 
res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet());
+        assertEquals(3, res.size());
 
-        assertTrue(resultKeys.contains(newKeyValueRow1.byteBuffer()));
-        assertTrue(resultKeys.contains(newKeyValueRow2.byteBuffer()));
-        assertTrue(resultKeys.contains(newKeyValueRow3.byteBuffer()));
+        assertRowEquals(res.get(0), newKeyValueRow1);
+        assertRowEquals(res.get(1), newKeyValueRow2);
+        assertRowEquals(res.get(2), newKeyValueRow3);
     }
 
     @Test
@@ -554,4 +544,10 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
     protected static IgniteImpl node() {
         return (IgniteImpl) NODE;
     }
+
+    private static void assertRowEquals(BinaryRow row1, BinaryRow row2) {
+        assertThat(row1.schemaVersion(), is(row2.schemaVersion()));
+        assertThat(row1.hasValue(), is(row2.hasValue()));
+        assertThat(row1.tupleSlice(), is(row2.tupleSlice()));
+    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index 8048e36e5f..a519a48a23 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -18,25 +18,22 @@
 package org.apache.ignite.internal.schema;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 /**
  * Binary row interface. Data layout is described in packages' {@code 
README.md}.
  */
 public interface BinaryRow {
+    ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+
     /** Size of schema version field. */
     int SCHEMA_VERSION_FLD_LEN = Short.BYTES;
 
     /** Row schema version field offset. */
     int SCHEMA_VERSION_OFFSET = 0;
 
-    /** Size of 'has value' field. */
-    int HAS_VALUE_FLD_LEN = Byte.BYTES;
-
-    /** Row 'has value' field offset. */
-    int HAS_VALUE_OFFSET = SCHEMA_VERSION_OFFSET + SCHEMA_VERSION_FLD_LEN;
-
     /** Row binary tuple field offset. */
-    int TUPLE_OFFSET = HAS_VALUE_OFFSET + HAS_VALUE_FLD_LEN;
+    int TUPLE_OFFSET = SCHEMA_VERSION_OFFSET + SCHEMA_VERSION_FLD_LEN;
 
     /** Get row schema version. */
     int schemaVersion();
@@ -44,12 +41,13 @@ public interface BinaryRow {
     /** Get has value flag: {@code true} if row has non-null value, {@code 
false} otherwise. */
     boolean hasValue();
 
+    /** Length of the {@link #tupleSlice}. */
+    int tupleSliceLength();
+
     /** Get ByteBuffer slice representing the binary tuple. */
     ByteBuffer tupleSlice();
 
-    /** Get byte array of the row. */
-    byte[] bytes();
-
     /** Returns the representation of this row as a Byte Buffer. */
+    // TODO: remove this method, see 
https://issues.apache.org/jira/browse/IGNITE-19937
     ByteBuffer byteBuffer();
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java
new file mode 100644
index 0000000000..c7d41c452b
--- /dev/null
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import static 
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.ROW_HAS_VALUE_FLAG;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Binary row implementation that stores the schema version as a separate 
field.
+ */
+public class BinaryRowImpl implements BinaryRow {
+    private final int schemaVersion;
+
+    private final ByteBuffer binaryTuple;
+
+    /**
+     * Constructor.
+     *
+     * @param schemaVersion Schema version.
+     * @param binaryTuple Binary tuple.
+     */
+    public BinaryRowImpl(int schemaVersion, ByteBuffer binaryTuple) {
+        this.schemaVersion = schemaVersion;
+        this.binaryTuple = binaryTuple;
+    }
+
+    @Override
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    @Override
+    public boolean hasValue() {
+        return (binaryTuple.get(0) & ROW_HAS_VALUE_FLAG) != 0;
+    }
+
+    @Override
+    public ByteBuffer tupleSlice() {
+        return binaryTuple.duplicate().order(ORDER);
+    }
+
+    @Override
+    public ByteBuffer byteBuffer() {
+        return ByteBuffer.allocate(tupleSliceLength() + Short.BYTES)
+                .order(ORDER)
+                .putShort((short) schemaVersion())
+                .put(tupleSlice())
+                .rewind();
+    }
+
+    @Override
+    public int tupleSliceLength() {
+        return binaryTuple.remaining();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        BinaryRowImpl binaryRow = (BinaryRowImpl) o;
+
+        if (schemaVersion != binaryRow.schemaVersion) {
+            return false;
+        }
+
+        return binaryTuple.equals(binaryRow.binaryTuple);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = schemaVersion;
+        result = 31 * result + binaryTuple.hashCode();
+        return result;
+    }
+}
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index d732b3aa15..2eb9ab6594 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.schema;
 
+import static 
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.ROW_HAS_VALUE_FLAG;
+
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 
 /**
  * Heap byte buffer-based row.
  */
+// TODO: remove this class, see 
https://issues.apache.org/jira/browse/IGNITE-19937
 public class ByteBufferRow implements BinaryRow {
-    public static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
-
     /** Row buffer. */
     private final ByteBuffer buf;
 
@@ -58,9 +58,7 @@ public class ByteBufferRow implements BinaryRow {
 
     @Override
     public boolean hasValue() {
-        byte schemaVer = buf.get(HAS_VALUE_OFFSET);
-
-        return schemaVer == 1;
+        return (buf.get(TUPLE_OFFSET) & ROW_HAS_VALUE_FLAG) != 0;
     }
 
     /** {@inheritDoc} */
@@ -75,19 +73,12 @@ public class ByteBufferRow implements BinaryRow {
 
     /** {@inheritDoc} */
     @Override
-    public byte[] bytes() {
-        // TODO IGNITE-15934 avoid copy.
-        byte[] tmp = new byte[buf.limit()];
-
-        buf.get(tmp);
-        buf.rewind();
-
-        return tmp;
+    public ByteBuffer byteBuffer() {
+        return buf.duplicate().order(ORDER);
     }
 
-    /** {@inheritDoc} */
     @Override
-    public ByteBuffer byteBuffer() {
-        return buf.slice().order(ORDER);
+    public int tupleSliceLength() {
+        return buf.remaining() - TUPLE_OFFSET;
     }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index 6b006a9f50..e3030422af 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -113,15 +113,14 @@ public class Row extends BinaryTupleReader implements 
BinaryRowEx, SchemaAware,
         return row.tupleSlice();
     }
 
-    /** {@inheritDoc} */
     @Override
-    public byte[] bytes() {
-        return row.bytes();
+    public ByteBuffer byteBuffer() {
+        return row.byteBuffer();
     }
 
     @Override
-    public ByteBuffer byteBuffer() {
-        return row.byteBuffer();
+    public int tupleSliceLength() {
+        return row.tupleSliceLength();
     }
 
     /** {@inheritDoc} */
@@ -146,4 +145,23 @@ public class Row extends BinaryTupleReader implements 
BinaryRowEx, SchemaAware,
     public BinaryTuple binaryTuple() {
         return new BinaryTuple(binaryTupleSchema.elementCount(), 
row.tupleSlice());
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Row row1 = (Row) o;
+
+        return row.equals(row1.row);
+    }
+
+    @Override
+    public int hashCode() {
+        return row.hashCode();
+    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java
index 3598c24aac..6efccfed08 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java
@@ -17,9 +17,7 @@
 
 package org.apache.ignite.internal.schema.row;
 
-import static org.apache.ignite.internal.schema.BinaryRow.HAS_VALUE_FLD_LEN;
-import static 
org.apache.ignite.internal.schema.BinaryRow.SCHEMA_VERSION_FLD_LEN;
-import static org.apache.ignite.internal.schema.ByteBufferRow.ORDER;
+import static 
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.ROW_HAS_VALUE_FLAG;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -34,8 +32,8 @@ import java.util.UUID;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.schema.AssemblyException;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import org.apache.ignite.internal.schema.BitmaskNativeType;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.Columns;
 import org.apache.ignite.internal.schema.DecimalNativeType;
@@ -653,13 +651,13 @@ public class RowAssembler {
      * @return Created {@link BinaryRow}.
      */
     public static BinaryRow build(ByteBuffer binTupleBuffer, int 
schemaVersion, boolean hasValue) {
-        ByteBuffer buffer = ByteBuffer.allocate(SCHEMA_VERSION_FLD_LEN + 
HAS_VALUE_FLD_LEN + binTupleBuffer.limit()).order(ORDER);
-        buffer.putShort((short) schemaVersion);
-        buffer.put(hasValue ? (byte) 1 : 0);
-        buffer.put(binTupleBuffer);
-        buffer.position(0);
+        if (hasValue) {
+            byte flags = binTupleBuffer.get(0);
 
-        return new ByteBufferRow(buffer);
+            binTupleBuffer.put(0, (byte) (flags | ROW_HAS_VALUE_FLAG));
+        }
+
+        return new BinaryRowImpl(schemaVersion, binTupleBuffer);
     }
 
     /**
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
index a0d9e8aff1..cd7d79bd29 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
@@ -499,9 +499,9 @@ public class KvMarshallerTest {
         BinaryRow row4 = marshaller4.marshal(1L, new 
TestPojoWrapper(serializedPojo));
 
         // Verify all rows are equivalent.
-        assertArrayEquals(row.bytes(), row2.bytes());
-        assertArrayEquals(row.bytes(), row3.bytes());
-        assertArrayEquals(row.bytes(), row4.bytes());
+        assertEquals(row, row2);
+        assertEquals(row, row3);
+        assertEquals(row, row4);
 
         // Check key.
         assertEquals(1L, marshaller1.unmarshalKey(new Row(schema, row)));
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 2e41e9aefd..c9d7659512 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -83,7 +83,7 @@ public abstract class AbstractMvPartitionStorageTest extends 
BaseMvPartitionStor
     }
 
     /**
-     * Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, 
BinaryRow, UUID, UUID, int)}.
+     * Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, 
BinaryRow, UUID, int, int)}.
      */
     @Test
     public void testAddWrite() {
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 6f0f07a14d..7a3e5ebefd 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -28,7 +28,6 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
@@ -1030,13 +1029,14 @@ public abstract class AbstractMvTableStorageTest 
extends BaseMvStoragesTest {
             List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows
     ) {
         for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
-            List<byte[]> allVersions = 
mvPartitionStorage.runConsistently(locker -> {
+            List<BinaryRow> allVersions = 
mvPartitionStorage.runConsistently(locker -> {
                 locker.lock(row.get1());
 
-                return 
toListOfByteArrays(mvPartitionStorage.scanVersions(row.get1()));
+                return 
toListOfBinaryRows(mvPartitionStorage.scanVersions(row.get1()));
             });
 
-            assertThat(allVersions, containsInAnyOrder(row.get2().bytes()));
+            assertThat(allVersions, hasSize(1));
+            assertRowMatches(allVersions.get(0), row.get2());
 
             IndexRow hashIndexRow = 
indexRow(hashIndexStorage.indexDescriptor(), row.get2(), row.get1());
             IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), row.get2(), row.get1());
@@ -1056,9 +1056,9 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         assertEquals(expLastAppliedTerm, storage.lastAppliedTerm());
     }
 
-    private static List<byte[]> toListOfByteArrays(Cursor<ReadResult> cursor) {
+    private static List<BinaryRow> toListOfBinaryRows(Cursor<ReadResult> 
cursor) {
         try (cursor) {
-            return 
cursor.stream().map(ReadResult::binaryRow).map(BinaryRow::bytes).collect(toList());
+            return 
cursor.stream().map(ReadResult::binaryRow).collect(toList());
         }
     }
 
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 164d1fbbdc..9a751332e5 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.storage;
 
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -146,7 +145,9 @@ public abstract class BaseMvStoragesTest {
 
     protected final void assertRowMatches(@Nullable BinaryRow 
rowUnderQuestion, BinaryRow expectedRow) {
         assertThat(rowUnderQuestion, is(notNullValue()));
-        assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes())));
+        assertThat(rowUnderQuestion.schemaVersion(), 
is(expectedRow.schemaVersion()));
+        assertThat(rowUnderQuestion.hasValue(), is(expectedRow.hasValue()));
+        assertThat(rowUnderQuestion.tupleSlice(), 
is(expectedRow.tupleSlice()));
     }
 
     /**
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
index 1cb8845d4a..1b466baa2b 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
@@ -81,7 +81,7 @@ class AbortWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
             operationType = OperationType.REMOVE;
         }
 
-        previousUncommittedRowVersion = 
storage.rowVersionToBinaryRow(latestVersion);
+        previousUncommittedRowVersion = latestVersion.value();
     }
 
     @Override
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index c4d43d137b..ca43e10670 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -41,7 +41,6 @@ import 
org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowMapClosure;
 import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
 import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
@@ -91,8 +90,6 @@ import org.jetbrains.annotations.Nullable;
  * </ul>
  */
 public abstract class AbstractPageMemoryMvPartitionStorage implements 
MvPartitionStorage {
-    private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
-
     static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE = timestamp -> 
true;
 
     static final Predicate<HybridTimestamp> DONT_LOAD_VALUE = timestamp -> 
false;
@@ -365,9 +362,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
 
             return writeIntentToResult(versionChain, rowVersion, 
newestCommitTs);
         } else {
-            BinaryRow row = rowVersionToBinaryRow(rowVersion);
-
-            return ReadResult.createFromCommitted(versionChain.rowId(), row, 
rowVersion.timestamp());
+            return ReadResult.createFromCommitted(versionChain.rowId(), 
rowVersion.value(), rowVersion.timestamp());
         }
     }
 
@@ -401,14 +396,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         return findRowVersion.getResult();
     }
 
-    @Nullable BinaryRow rowVersionToBinaryRow(RowVersion rowVersion) {
-        if (rowVersion.isTombstone()) {
-            return null;
-        }
-
-        return new ByteBufferRow(rowVersion.value());
-    }
-
     /**
      * Finds row version by timestamp. See {@link 
MvPartitionStorage#read(RowId, HybridTimestamp)} for details on the API.
      *
@@ -480,7 +467,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
                 if (curCommit.isTombstone()) {
                     row = null;
                 } else {
-                    row = new ByteBufferRow(curCommit.value());
+                    row = curCommit.value();
                 }
 
                 return ReadResult.createFromCommitted(chain.rowId(), row, 
curCommit.timestamp());
@@ -503,11 +490,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         int commitTableId = chain.commitTableId();
         int commitPartitionId = chain.commitPartitionId();
 
-        BinaryRow row = rowVersionToBinaryRow(rowVersion);
-
         return ReadResult.createFromWriteIntent(
                 chain.rowId(),
-                row,
+                rowVersion.value(),
                 transactionId,
                 commitTableId,
                 commitPartitionId,
@@ -523,11 +508,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         }
     }
 
-    static byte[] rowBytes(@Nullable BinaryRow row) {
-        // TODO IGNITE-16913 Add proper way to write row bytes into array 
without allocations.
-        return row == null ? TOMBSTONE_PAYLOAD : row.bytes();
-    }
-
     @Override
     public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, 
UUID txId, int commitTableId, int commitPartitionId)
             throws TxIdMismatchException, StorageException {
@@ -946,7 +926,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
 
         RowVersion removedRowVersion = removeWriteOnGc(rowId, rowTimestamp, 
gcRowVersion.getLink());
 
-        return rowVersionToBinaryRow(removedRowVersion);
+        return removedRowVersion.value();
     }
 
     private RowVersion removeWriteOnGc(RowId rowId, HybridTimestamp 
rowTimestamp, long rowLink) {
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
index 02876a9ce1..40d44e9364 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
@@ -19,9 +19,7 @@ package org.apache.ignite.internal.storage.pagememory.mv;
 
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.DONT_LOAD_VALUE;
-import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
 
-import java.nio.ByteBuffer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
@@ -127,9 +125,7 @@ class AddWriteCommittedInvokeClosure implements 
InvokeClosure<VersionChain> {
     }
 
     private RowVersion insertCommittedRowVersion(@Nullable BinaryRow row, 
HybridTimestamp commitTimestamp, long nextPartitionlessLink) {
-        byte[] rowBytes = rowBytes(row);
-
-        RowVersion rowVersion = new RowVersion(storage.partitionId, 
commitTimestamp, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
+        RowVersion rowVersion = new RowVersion(storage.partitionId, 
commitTimestamp, nextPartitionlessLink, row);
 
         storage.insertRowVersion(rowVersion);
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
index e2fd214319..2058bac162 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
@@ -19,9 +19,7 @@ package org.apache.ignite.internal.storage.pagememory.mv;
 
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
-import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
 
-import java.nio.ByteBuffer;
 import java.util.UUID;
 import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
@@ -96,7 +94,7 @@ class AddWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
         if (oldRow.isUncommitted()) {
             RowVersion currentVersion = 
storage.readRowVersion(oldRow.headLink(), ALWAYS_LOAD_VALUE);
 
-            previousUncommittedRowVersion = 
storage.rowVersionToBinaryRow(currentVersion);
+            previousUncommittedRowVersion = currentVersion.value();
 
             // As we replace an uncommitted version with new one, we need to 
remove old uncommitted version.
             toRemove = currentVersion;
@@ -125,9 +123,7 @@ class AddWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
     }
 
     private RowVersion insertRowVersion(@Nullable BinaryRow row, long 
nextPartitionlessLink) {
-        byte[] rowBytes = rowBytes(row);
-
-        RowVersion rowVersion = new RowVersion(storage.partitionId, 
nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
+        RowVersion rowVersion = new RowVersion(storage.partitionId, 
nextPartitionlessLink, row);
 
         storage.insertRowVersion(rowVersion);
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
index b42e3e9417..03a8833cab 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
@@ -27,7 +27,9 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
 import org.apache.ignite.internal.pagememory.io.DataPagePayload;
 import org.apache.ignite.internal.pagememory.util.PageUtils;
-import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import 
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
 import org.jetbrains.annotations.Nullable;
 
@@ -51,6 +53,8 @@ class FindRowVersion implements 
PageMemoryTraversal<RowVersionFilter> {
 
     private int rowValueSize;
 
+    private int schemaVersion;
+
     private @Nullable RowVersion result;
 
     FindRowVersion(int partitionId, boolean loadValueBytes) {
@@ -75,6 +79,7 @@ class FindRowVersion implements 
PageMemoryTraversal<RowVersionFilter> {
         rowLink = link;
         rowTimestamp = HybridTimestamps.readTimestamp(pageAddr, 
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
         rowNextLink = nextLink;
+        schemaVersion = Short.toUnsignedInt(PageUtils.getShort(pageAddr, 
payload.offset() + RowVersion.SCHEMA_VERSION_OFFSET));
 
         if (loadValueBytes) {
             return readRowVersionValue.consumePagePayload(link, pageAddr, 
payload, null);
@@ -96,9 +101,11 @@ class FindRowVersion implements 
PageMemoryTraversal<RowVersionFilter> {
 
             byte[] valueBytes = readRowVersionValue.result();
 
-            ByteBuffer value = 
ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER);
+            BinaryRow row = valueBytes.length == 0
+                    ? null
+                    : new BinaryRowImpl(schemaVersion, 
ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER));
 
-            result = new RowVersion(partitionId, rowLink, rowTimestamp, 
rowNextLink, value);
+            result = new RowVersion(partitionId, rowLink, rowTimestamp, 
rowNextLink, row);
         } else {
             result = new RowVersion(partitionId, rowLink, rowTimestamp, 
rowNextLink, rowValueSize);
         }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
index ef44a3a70c..a6af6ef551 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
@@ -26,7 +26,9 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
 import org.apache.ignite.internal.pagememory.io.DataPagePayload;
 import org.apache.ignite.internal.pagememory.util.PageUtils;
-import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -45,6 +47,8 @@ class ReadRowVersion implements 
PageMemoryTraversal<Predicate<HybridTimestamp>>
 
     private long nextLink;
 
+    private int schemaVersion;
+
     private final ReadRowVersionValue readRowVersionValue = new 
ReadRowVersionValue();
 
     ReadRowVersion(int partitionId) {
@@ -67,6 +71,7 @@ class ReadRowVersion implements 
PageMemoryTraversal<Predicate<HybridTimestamp>>
 
         timestamp = HybridTimestamps.readTimestamp(pageAddr, payload.offset() 
+ RowVersion.TIMESTAMP_OFFSET);
         nextLink = readPartitionless(partitionId, pageAddr, payload.offset() + 
RowVersion.NEXT_LINK_OFFSET);
+        schemaVersion = Short.toUnsignedInt(PageUtils.getShort(pageAddr, 
payload.offset() + RowVersion.SCHEMA_VERSION_OFFSET));
 
         if (!loadValue.test(timestamp)) {
             int valueSize = PageUtils.getInt(pageAddr, payload.offset() + 
RowVersion.VALUE_SIZE_OFFSET);
@@ -90,9 +95,11 @@ class ReadRowVersion implements 
PageMemoryTraversal<Predicate<HybridTimestamp>>
 
         byte[] valueBytes = readRowVersionValue.result();
 
-        ByteBuffer value = 
ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER);
+        BinaryRow row = valueBytes.length == 0
+                ? null
+                : new BinaryRowImpl(schemaVersion, 
ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER));
 
-        result = new RowVersion(partitionIdFromLink(firstFragmentLink), 
firstFragmentLink, timestamp, nextLink, value);
+        result = new RowVersion(partitionIdFromLink(firstFragmentLink), 
firstFragmentLink, timestamp, nextLink, row);
     }
 
     RowVersion result() {
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index a006e8d57c..e849393ca9 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -20,12 +20,12 @@ package org.apache.ignite.internal.storage.pagememory.mv;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 
-import java.nio.ByteBuffer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.Storable;
 import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
 import org.apache.ignite.internal.pagememory.io.IoVersions;
 import org.apache.ignite.internal.pagememory.util.PartitionlessLinks;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
@@ -37,11 +37,13 @@ import org.jetbrains.annotations.Nullable;
 public final class RowVersion implements Storable {
     private static final int NEXT_LINK_STORE_SIZE_BYTES = 
PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
     private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES;
+    private static final int SCHEMA_VERSION_SIZE_BYTES = Short.BYTES;
 
     public static final int TIMESTAMP_OFFSET = 0;
     public static final int NEXT_LINK_OFFSET = TIMESTAMP_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
     public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET + 
NEXT_LINK_STORE_SIZE_BYTES;
-    public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET + 
VALUE_SIZE_STORE_SIZE_BYTES;
+    public static final int SCHEMA_VERSION_OFFSET = VALUE_SIZE_OFFSET + 
VALUE_SIZE_STORE_SIZE_BYTES;
+    public static final int VALUE_OFFSET = SCHEMA_VERSION_OFFSET + 
SCHEMA_VERSION_SIZE_BYTES;
 
     private final int partitionId;
 
@@ -54,32 +56,32 @@ public final class RowVersion implements Storable {
     private final int valueSize;
 
     @IgniteToStringExclude
-    private final @Nullable ByteBuffer value;
+    private final @Nullable BinaryRow value;
 
     /**
      * Constructor.
      */
-    public RowVersion(int partitionId, long nextLink, ByteBuffer value) {
+    public RowVersion(int partitionId, long nextLink, @Nullable BinaryRow 
value) {
         this(partitionId, 0, null, nextLink, value);
     }
 
     /**
      * Constructor.
      */
-    public RowVersion(int partitionId, HybridTimestamp commitTimestamp, long 
nextLink, ByteBuffer value) {
+    public RowVersion(int partitionId, HybridTimestamp commitTimestamp, long 
nextLink, @Nullable BinaryRow value) {
         this(partitionId, 0, commitTimestamp, nextLink, value);
     }
 
     /**
      * Constructor.
      */
-    public RowVersion(int partitionId, long link, @Nullable HybridTimestamp 
timestamp, long nextLink, @Nullable ByteBuffer value) {
+    public RowVersion(int partitionId, long link, @Nullable HybridTimestamp 
timestamp, long nextLink, @Nullable BinaryRow value) {
         this.partitionId = partitionId;
         link(link);
 
         this.timestamp = timestamp;
         this.nextLink = nextLink;
-        this.valueSize = value == null ? 0 : value.limit();
+        this.valueSize = value == null ? 0 : value.tupleSliceLength();
         this.value = value;
     }
 
@@ -111,7 +113,7 @@ public final class RowVersion implements Storable {
         return valueSize;
     }
 
-    public @Nullable ByteBuffer value() {
+    public @Nullable BinaryRow value() {
         return value;
     }
 
@@ -120,17 +122,9 @@ public final class RowVersion implements Storable {
     }
 
     boolean isTombstone() {
-        return isTombstone(valueSize());
-    }
-
-    static boolean isTombstone(int valueSize) {
         return valueSize == 0;
     }
 
-    static boolean isTombstone(byte[] valueBytes) {
-        return isTombstone(valueBytes.length);
-    }
-
     boolean isUncommitted() {
         return timestamp == null;
     }
@@ -140,30 +134,28 @@ public final class RowVersion implements Storable {
     }
 
     @Override
-    public final void link(long link) {
+    public void link(long link) {
         this.link = link;
     }
 
     @Override
-    public final long link() {
+    public long link() {
         return link;
     }
 
     @Override
-    public final int partition() {
+    public int partition() {
         return partitionId;
     }
 
     @Override
     public int size() {
-        assert value != null;
-
-        return headerSize() + value.limit();
+        return headerSize() + valueSize;
     }
 
     @Override
     public int headerSize() {
-        return HYBRID_TIMESTAMP_SIZE + NEXT_LINK_STORE_SIZE_BYTES + 
VALUE_SIZE_STORE_SIZE_BYTES;
+        return HYBRID_TIMESTAMP_SIZE + NEXT_LINK_STORE_SIZE_BYTES + 
VALUE_SIZE_STORE_SIZE_BYTES + SCHEMA_VERSION_SIZE_BYTES;
     }
 
     @Override
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
index 0616b8cd1f..420aa19794 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
@@ -21,8 +21,6 @@ import static 
org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
 
 import java.util.NoSuchElementException;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -102,16 +100,12 @@ class ScanVersionsCursor implements Cursor<ReadResult> {
             if (rowVersion.isTombstone()) {
                 return ReadResult.empty(rowId);
             } else {
-                BinaryRow row = new ByteBufferRow(rowVersion.value());
-
-                return ReadResult.createFromCommitted(rowId, row, 
rowVersion.timestamp());
+                return ReadResult.createFromCommitted(rowId, 
rowVersion.value(), rowVersion.timestamp());
             }
         } else {
-            BinaryRow row = rowVersion.isTombstone() ? null : new 
ByteBufferRow(rowVersion.value());
-
             return ReadResult.createFromWriteIntent(
                     rowId,
-                    row,
+                    rowVersion.value(),
                     versionChain.transactionId(),
                     versionChain.commitTableId(),
                     versionChain.commitPartitionId(),
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
index 9c5d8037d7..9f606c3d73 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
 import org.apache.ignite.internal.pagememory.io.IoVersions;
 import org.apache.ignite.internal.pagememory.util.PartitionlessLinks;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.pagememory.mv.HybridTimestamps;
 import org.apache.ignite.internal.storage.pagememory.mv.RowVersion;
 import org.apache.ignite.lang.IgniteStringBuilder;
@@ -50,45 +51,69 @@ public class RowVersionDataIo extends 
AbstractDataPageIo<RowVersion> {
     }
 
     @Override
-    protected void writeRowData(long pageAddr, int dataOff, int payloadSize, 
RowVersion row, boolean newRow) {
+    protected void writeRowData(long pageAddr, int dataOff, int payloadSize, 
RowVersion rowVersion, boolean newRow) {
         assertPageType(pageAddr);
 
-        long addr = pageAddr + dataOff;
+        int offset = dataOff;
 
-        putShort(addr, 0, (short) payloadSize);
-        addr += Short.BYTES;
+        putShort(pageAddr, offset, (short) payloadSize);
+        offset += Short.BYTES;
 
-        addr += HybridTimestamps.writeTimestampToMemory(addr, 0, 
row.timestamp());
+        offset += HybridTimestamps.writeTimestampToMemory(pageAddr, offset, 
rowVersion.timestamp());
 
-        addr += writePartitionless(addr, row.nextLink());
+        offset += writePartitionless(pageAddr + offset, rowVersion.nextLink());
 
-        putInt(addr, 0, row.valueSize());
-        addr += Integer.BYTES;
+        putInt(pageAddr, offset, rowVersion.valueSize());
+        offset += Integer.BYTES;
 
-        putByteBuffer(addr, 0, row.value());
+        BinaryRow row = rowVersion.value();
+
+        if (row != null) {
+            putShort(pageAddr, offset, (short) row.schemaVersion());
+            offset += Short.BYTES;
+
+            putByteBuffer(pageAddr, offset, row.tupleSlice());
+        } else {
+            putShort(pageAddr, offset, (short) 0);
+        }
     }
 
     @Override
-    protected void writeFragmentData(RowVersion row, ByteBuffer pageBuf, int 
rowOff, int payloadSize) {
+    protected void writeFragmentData(RowVersion rowVersion, ByteBuffer 
pageBuf, int rowOff, int payloadSize) {
         assertPageType(pageBuf);
 
+        int headerSize = rowVersion.headerSize();
+
+        BinaryRow row = rowVersion.value();
+
+        int bufferOffset;
+        int bufferSize;
+
         if (rowOff == 0) {
             // first fragment
-            assert row.headerSize() <= payloadSize : "Header must entirely fit 
in the first fragment, but header size is "
-                    + row.headerSize() + " and payload size is " + payloadSize;
+            assert headerSize <= payloadSize : "Header must entirely fit in 
the first fragment, but header size is "
+                    + headerSize + " and payload size is " + payloadSize;
+
+            HybridTimestamps.writeTimestampToBuffer(pageBuf, 
rowVersion.timestamp());
 
-            HybridTimestamps.writeTimestampToBuffer(pageBuf, row.timestamp());
+            PartitionlessLinks.writeToBuffer(pageBuf, rowVersion.nextLink());
 
-            PartitionlessLinks.writeToBuffer(pageBuf, row.nextLink());
+            pageBuf.putInt(rowVersion.valueSize());
 
-            pageBuf.putInt(row.valueSize());
+            pageBuf.putShort(row == null ? 0 : (short) row.schemaVersion());
 
-            putValueBufferIntoPage(pageBuf, row.value(), 0, payloadSize - 
row.headerSize());
+            bufferOffset = 0;
+            bufferSize = payloadSize - headerSize;
         } else {
             // non-first fragment
-            assert rowOff >= row.headerSize();
+            assert rowOff >= headerSize;
+
+            bufferOffset = rowOff - headerSize;
+            bufferSize = payloadSize;
+        }
 
-            putValueBufferIntoPage(pageBuf, row.value(), rowOff - 
row.headerSize(), payloadSize);
+        if (row != null) {
+            putValueBufferIntoPage(pageBuf, row.tupleSlice(), bufferOffset, 
bufferSize);
         }
     }
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index 5a1b55c707..b6fe2fbf3c 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -23,7 +23,7 @@ import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SI
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
-import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.deserializeRow;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampNatural;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
@@ -34,7 +34,6 @@ import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.TAB
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.gc.GcEntry;
 import org.jetbrains.annotations.Nullable;
@@ -246,7 +245,7 @@ class GarbageCollector {
 
                 assert valueBytes.length > 0; // Can't be a tombstone.
 
-                var row = new 
ByteBufferRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+                var row = deserializeRow(ByteBuffer.wrap(valueBytes));
 
                 // Delete the row from the data cf.
                 batch.delete(partCf, dataKey);
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
index 2a782efc11..9ea674b5e8 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -34,6 +34,8 @@ import java.nio.ByteOrder;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
@@ -251,6 +253,18 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
         return it;
     }
 
+    /**
+     * Converts an internal serialized presentation of a binary row into its 
Java Object counterpart.
+     */
+    static BinaryRow deserializeRow(ByteBuffer buffer) {
+        assert buffer.order() == ByteOrder.BIG_ENDIAN;
+
+        int schemaVersion = Short.toUnsignedInt(buffer.getShort());
+        ByteBuffer binaryTupleSlice = 
buffer.slice().order(TABLE_ROW_BYTE_ORDER);
+
+        return new BinaryRowImpl(schemaVersion, binaryTupleSlice);
+    }
+
     @Override
     public void close() {
         RocksUtils.closeAll(upperBoundReadOpts, upperBound);
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index d41024a759..922d9caa5f 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -19,19 +19,17 @@ package org.apache.ignite.internal.storage.rocksdb;
 
 import static java.lang.ThreadLocal.withInitial;
 import static java.nio.ByteBuffer.allocate;
+import static java.nio.ByteBuffer.allocateDirect;
 import static java.util.Arrays.copyOf;
 import static java.util.Arrays.copyOfRange;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
-import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.PARTITION_ID_OFFSET;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_PREFIX_SIZE;
-import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ID_OFFSET;
-import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ROW_BYTE_ORDER;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.THREAD_LOCAL_STATE;
-import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TX_ID_OFFSET;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_HEADER_SIZE;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_OFFSET;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.deserializeRow;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.putTimestampDesc;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampDesc;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
@@ -43,11 +41,7 @@ import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbI
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
-import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
-import static org.apache.ignite.internal.util.ByteUtils.bytesToUuid;
-import static org.apache.ignite.internal.util.ByteUtils.putUuidToBytes;
-import static org.apache.ignite.internal.util.GridUnsafe.getInt;
-import static org.apache.ignite.internal.util.GridUnsafe.getShort;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -61,7 +55,6 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
@@ -72,9 +65,7 @@ import 
org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.storage.gc.GcEntry;
 import org.apache.ignite.internal.storage.util.LocalLocker;
 import org.apache.ignite.internal.storage.util.StorageState;
-import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
@@ -114,20 +105,22 @@ import org.rocksdb.WriteBatchWithIndex;
  * <p>BE means Big Endian, meaning that lexicographical bytes order matches a 
natural order of partitions.
  *
  * <p>DESC means that timestamps are sorted from newest to oldest (N2O).
- * Please refer to {@link PartitionDataHelper#putTimestampDesc(ByteBuffer, 
HybridTimestamp)}
- * to see how it's achieved. Missing timestamp could be interpreted as a 
moment infinitely far away in the future.
+ * Please refer to {@link PartitionDataHelper#putTimestampDesc(ByteBuffer, 
HybridTimestamp)} to see how it's achieved. Missing timestamp
+ * could be interpreted as a moment infinitely far away in the future.
  */
 public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     /** Thread-local on-heap byte buffer instance to use for key 
manipulations. */
     private static final ThreadLocal<ByteBuffer> HEAP_KEY_BUFFER = 
withInitial(() -> allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
 
+    private static final ThreadLocal<ByteBuffer> DIRECT_KEY_BUFFER = 
withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
     /** Table storage instance. */
     private final RocksDbTableStorage tableStorage;
 
     /**
      * Partition ID (should be treated as an unsigned short).
      *
-     * <p/>Partition IDs are always stored in the big endian order, since they 
need to be compared lexicographically.
+     * <p>Partition IDs are always stored in the big endian order, since they 
need to be compared lexicographically.
      */
     private final int partitionId;
 
@@ -187,7 +180,6 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         helper = new PartitionDataHelper(tableId, partitionId, 
tableStorage.partitionCfHandle());
         gc = new GarbageCollector(helper, db, tableStorage.gcQueueHandle());
 
-
         lastAppliedIndexAndTermKey = createKey(PARTITION_META_PREFIX, tableId, 
partitionId);
         lastGroupConfigKey = createKey(PARTITION_CONF_PREFIX, tableId, 
partitionId);
 
@@ -331,7 +323,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public byte @Nullable [] committedGroupConfiguration() {
-        byte[] array =  busy(() -> {
+        byte[] array = busy(() -> {
             ThreadLocalState state = THREAD_LOCAL_STATE.get();
 
             return state == null ? lastGroupConfig : state.pendingGroupConfig;
@@ -374,40 +366,43 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
             assert rowIsLocked(rowId);
 
-            ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+            ByteBuffer keyBuf = prepareHeapKeyBuf(rowId).rewind();
 
             BinaryRow res = null;
 
             try {
                 // Check concurrent transaction data.
-                byte[] keyBufArray = keyBuf.array();
+                byte[] keyBytes = new byte[ROW_PREFIX_SIZE];
 
-                byte[] keyBytes = copyOf(keyBufArray, ROW_PREFIX_SIZE);
+                keyBuf.get(keyBytes);
 
-                byte[] previousValue = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, keyBytes);
+                keyBuf.rewind();
+
+                byte[] previousValueBytes = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, keyBytes);
 
                 // Previous value must belong to the same transaction.
-                if (previousValue != null) {
+                if (previousValueBytes != null) {
+                    ByteBuffer previousValue = 
ByteBuffer.wrap(previousValueBytes);
+
                     validateTxId(previousValue, txId);
 
                     res = wrapValueIntoBinaryRow(previousValue, true);
                 }
 
                 if (row == null) {
+                    ByteBuffer value = allocate(VALUE_HEADER_SIZE);
+
                     // Write empty value as a tombstone.
-                    if (previousValue != null) {
+                    if (previousValueBytes != null) {
                         // Reuse old array with transaction id already written 
to it.
-                        writeBatch.put(helper.partCf, keyBytes, 
copyOf(previousValue, VALUE_HEADER_SIZE));
+                        value.put(previousValueBytes, 0, VALUE_HEADER_SIZE);
                     } else {
-                        byte[] valueHeaderBytes = new byte[VALUE_HEADER_SIZE];
-
-                        putUuidToBytes(txId, valueHeaderBytes, TX_ID_OFFSET);
-                        putInt(valueHeaderBytes, TABLE_ID_OFFSET, 
commitTableId);
-
-                        writeBatch.put(helper.partCf, keyBytes, 
valueHeaderBytes);
+                        writeHeader(value, txId, commitTableId, 
commitPartitionId);
                     }
+
+                    writeBatch.put(helper.partCf, keyBytes, value.array());
                 } else {
-                    writeUnversioned(keyBufArray, row, txId, commitTableId, 
commitPartitionId);
+                    writeUnversioned(keyBytes, row, txId, commitTableId, 
commitPartitionId);
                 }
             } catch (RocksDBException e) {
                 throw new StorageException("Failed to update a row in storage: 
" + createStorageInfo(), e);
@@ -429,24 +424,37 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             throws RocksDBException {
         @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = 
PartitionDataHelper.requireWriteBatch();
 
-        byte[] rowBytes = rowBytes(row);
+        ByteBuffer value = allocate(rowSize(row) + VALUE_HEADER_SIZE);
 
-        ByteBuffer value = allocate(rowBytes.length + VALUE_HEADER_SIZE);
-        byte[] array = value.array();
+        writeHeader(value, txId, commitTableId, commitPartitionId);
 
-        putUuidToBytes(txId, array, TX_ID_OFFSET);
-        putInt(array, TABLE_ID_OFFSET, commitTableId);
-        putShort(array, PARTITION_ID_OFFSET, (short) commitPartitionId);
-
-        value.position(VALUE_OFFSET).put(rowBytes);
+        writeBinaryRow(value, row);
 
         // Write table row data as a value.
-        writeBatch.put(helper.partCf, copyOf(keyArray, ROW_PREFIX_SIZE), 
value.array());
+        writeBatch.put(helper.partCf, keyArray, value.array());
+    }
+
+    private static int rowSize(BinaryRow row) {
+        // Tuple + schema version.
+        return row.tupleSliceLength() + Short.BYTES;
+    }
+
+    private static void writeHeader(ByteBuffer dest, UUID txId, int 
commitTableId, int commitPartitionId) {
+        assert dest.order() == ByteOrder.BIG_ENDIAN;
+
+        dest
+                .putLong(txId.getMostSignificantBits())
+                .putLong(txId.getLeastSignificantBits())
+                .putInt(commitTableId)
+                .putShort((short) commitPartitionId);
     }
 
-    private static byte[] rowBytes(BinaryRow row) {
-        //TODO IGNITE-16913 Add proper way to write row bytes into array 
without allocations.
-        return row.bytes();
+    private static void writeBinaryRow(ByteBuffer dest, BinaryRow row) {
+        assert dest.order() == ByteOrder.BIG_ENDIAN;
+
+        dest
+                .putShort((short) row.schemaVersion())
+                .put(row.tupleSlice());
     }
 
     @Override
@@ -458,10 +466,16 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
             assert rowIsLocked(rowId);
 
-            ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+            ByteBuffer keyBuf = prepareDirectKeyBuf(rowId)
+                    .position(0)
+                    .limit(ROW_PREFIX_SIZE);
 
             try {
-                byte[] keyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+                byte[] keyBytes = new byte[ROW_PREFIX_SIZE];
+
+                keyBuf.get(keyBytes);
+
+                keyBuf.rewind();
 
                 byte[] previousValue = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, keyBytes);
 
@@ -471,9 +485,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 }
 
                 // Perform unconditional remove for the key without associated 
timestamp.
-                writeBatch.delete(helper.partCf, keyBytes);
+                writeBatch.delete(helper.partCf, keyBuf);
 
-                return wrapValueIntoBinaryRow(previousValue, true);
+                return wrapValueIntoBinaryRow(ByteBuffer.wrap(previousValue), 
true);
             } catch (RocksDBException e) {
                 throw new StorageException("Failed to roll back 
insert/update", e);
             }
@@ -546,9 +560,6 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
             boolean isNewValueTombstone = row == null;
 
-            //TODO IGNITE-16913 Add proper way to write row bytes into array 
without allocations.
-            byte[] rowBytes = row != null ? rowBytes(row) : 
ArrayUtils.BYTE_EMPTY_ARRAY;
-
             boolean newAndPrevTombstones; // Both this and previous values for 
the row id are tombstones.
             try {
                 newAndPrevTombstones = gc.tryAddToGcQueue(writeBatch, rowId, 
commitTimestamp, isNewValueTombstone);
@@ -559,8 +570,21 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             // We only write tombstone if the previous value for the same row 
id was not a tombstone.
             // So there won't be consecutive tombstones for the same row id.
             if (!newAndPrevTombstones) {
+                //TODO IGNITE-16913 Add proper way to write row bytes into 
array without allocations.
+                byte[] rowBytes;
+
+                if (row == null) {
+                    rowBytes = BYTE_EMPTY_ARRAY;
+                } else {
+                    ByteBuffer rowBuffer = allocate(rowSize(row));
+
+                    writeBinaryRow(rowBuffer, row);
+
+                    rowBytes = rowBuffer.array();
+                }
+
                 try {
-                    writeBatch.put(helper.partCf, copyOf(keyBuf.array(), 
MAX_KEY_SIZE), rowBytes);
+                    writeBatch.put(helper.partCf, keyBuf.array(), rowBytes);
                 } catch (RocksDBException e) {
                     throw new StorageException("Failed to update a row in 
storage: " + createStorageInfo(), e);
                 }
@@ -606,23 +630,25 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     private ReadResult readLatestVersion(RowId rowId, RocksIterator 
seekIterator) {
-        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+        ByteBuffer keyBuf = prepareDirectKeyBuf(rowId);
 
         // Seek to the first appearance of row id if timestamp isn't set.
         // Since timestamps are sorted from newest to oldest, first occurrence 
will always be the latest version.
-        // Unfortunately, copy here is unavoidable with current API.
         assert keyBuf.position() == ROW_PREFIX_SIZE;
-        seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+
+        
seekIterator.seek(keyBuf.duplicate().position(0).limit(ROW_PREFIX_SIZE));
 
         if (invalid(seekIterator)) {
             // No data at all.
             return ReadResult.empty(rowId);
         }
 
-        ByteBuffer readKeyBuf = 
MV_KEY_BUFFER.get().rewind().limit(MAX_KEY_SIZE);
+        ByteBuffer readKeyBuf = DIRECT_KEY_BUFFER.get().clear();
 
         int keyLength = seekIterator.key(readKeyBuf);
 
+        readKeyBuf.position(0).limit(keyLength);
+
         if (!matches(rowId, readKeyBuf)) {
             // It is already a different row, so no version exists for our 
rowId.
             return ReadResult.empty(rowId);
@@ -630,14 +656,12 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
         boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-        byte[] valueBytes = seekIterator.value();
+        ByteBuffer valueBytes = ByteBuffer.wrap(seekIterator.value());
 
         return readResultFromKeyAndValue(isWriteIntent, readKeyBuf, 
valueBytes);
     }
 
-    private ReadResult readResultFromKeyAndValue(boolean isWriteIntent, 
ByteBuffer keyBuf, byte[] valueBytes) {
-        assert valueBytes != null;
-
+    private ReadResult readResultFromKeyAndValue(boolean isWriteIntent, 
ByteBuffer keyBuf, ByteBuffer valueBytes) {
         RowId rowId = getRowId(keyBuf);
 
         if (!isWriteIntent) {
@@ -671,7 +695,8 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
     /**
      * Walks "version chain" via the iterator to find a row by timestamp.
-     * See {@link MvPartitionStorage#read(RowId, HybridTimestamp)} for details.
+     *
+     * <p>See {@link MvPartitionStorage#read(RowId, HybridTimestamp)} for 
details.
      *
      * @param seekIterator Iterator, on which seek operation was already 
performed.
      * @param rowId Row id.
@@ -684,7 +709,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         // There's no guarantee that required key even exists. If it doesn't, 
then "seek" will point to a different key.
         // To avoid returning its value, we have to check that actual key 
matches what we need.
         // Here we prepare direct buffer to read key without timestamp. Shared 
direct buffer is used to avoid extra memory allocations.
-        ByteBuffer foundKeyBuf = 
MV_KEY_BUFFER.get().rewind().limit(MAX_KEY_SIZE);
+        ByteBuffer foundKeyBuf = MV_KEY_BUFFER.get().clear();
 
         int keyLength = 0;
 
@@ -703,7 +728,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 return ReadResult.empty(rowId);
             }
 
-            foundKeyBuf.rewind().limit(MAX_KEY_SIZE);
+            foundKeyBuf.clear();
             keyLength = seekIterator.key(foundKeyBuf);
 
             if (!matches(rowId, foundKeyBuf)) {
@@ -711,11 +736,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 return ReadResult.empty(rowId);
             }
 
-            byte[] valueBytes = seekIterator.value();
-
             boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
             if (isWriteIntent) {
+                ByteBuffer valueBytes = ByteBuffer.wrap(seekIterator.value());
+
                 // Let's check if there is a committed write.
                 seekIterator.next();
 
@@ -724,7 +749,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                     return wrapUncommittedValue(rowId, valueBytes, null);
                 }
 
-                foundKeyBuf.rewind().limit(MAX_KEY_SIZE);
+                foundKeyBuf.clear();
                 seekIterator.key(foundKeyBuf);
 
                 if (!matches(rowId, foundKeyBuf)) {
@@ -741,7 +766,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
             HybridTimestamp rowTimestamp = readTimestampDesc(foundKeyBuf);
 
-            byte[] valueBytes = seekIterator.value();
+            ByteBuffer valueBytes = ByteBuffer.wrap(seekIterator.value());
 
             if (rowTimestamp.equals(timestamp)) {
                 // This is exactly the row we are looking for.
@@ -757,7 +782,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 return wrapCommittedValue(rowId, valueBytes, rowTimestamp);
             }
 
-            foundKeyBuf.rewind().limit(MAX_KEY_SIZE);
+            foundKeyBuf.clear();
             keyLength = seekIterator.key(foundKeyBuf);
 
             if (!matches(rowId, foundKeyBuf)) {
@@ -768,7 +793,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
             if (isWriteIntent) {
-                return wrapUncommittedValue(rowId, seekIterator.value(), 
rowTimestamp);
+                return wrapUncommittedValue(rowId, 
ByteBuffer.wrap(seekIterator.value()), rowTimestamp);
             }
 
             return wrapCommittedValue(rowId, valueBytes, 
readTimestampDesc(foundKeyBuf));
@@ -819,7 +844,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
                     boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                    return readResultFromKeyAndValue(isWriteIntent, 
ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), value);
+                    return readResultFromKeyAndValue(
+                            isWriteIntent,
+                            ByteBuffer.wrap(key).order(KEY_BYTE_ORDER),
+                            ByteBuffer.wrap(value)
+                    );
                 }
 
                 @Override
@@ -880,7 +909,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         return busy(() -> {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-            ByteBuffer keyBuf = 
prepareHeapKeyBuf(lowerBound).rewind().limit(ROW_PREFIX_SIZE);
+            ByteBuffer keyBuf = prepareDirectKeyBuf(lowerBound)
+                    .position(0)
+                    .limit(ROW_PREFIX_SIZE);
 
             try (RocksIterator it = db.newIterator(helper.partCf, 
helper.scanReadOpts)) {
                 it.seek(keyBuf);
@@ -891,13 +922,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                     return null;
                 }
 
-                ByteBuffer readKeyBuf = 
MV_KEY_BUFFER.get().rewind().limit(ROW_PREFIX_SIZE);
+                keyBuf.rewind();
 
-                it.key(readKeyBuf);
+                it.key(keyBuf);
 
-                return getRowId(readKeyBuf);
-            } finally {
-                keyBuf.limit(MAX_KEY_SIZE);
+                return getRowId(keyBuf);
             }
         });
     }
@@ -1012,29 +1041,36 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      * Prepares thread-local on-heap byte buffer. Writes row id in it. 
Partition id is already there. Timestamp is not cleared.
      */
     private ByteBuffer prepareHeapKeyBuf(RowId rowId) {
-        assert rowId.partitionId() == partitionId : rowId;
+        ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().clear();
 
-        ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().rewind();
+        writeKey(keyBuf, rowId);
 
-        keyBuf.putInt(tableStorage.getTableId());
-        keyBuf.putShort((short) rowId.partitionId());
+        return keyBuf;
+    }
+
+    private ByteBuffer prepareDirectKeyBuf(RowId rowId) {
+        ByteBuffer keyBuf = DIRECT_KEY_BUFFER.get().clear();
 
-        helper.putRowId(keyBuf, rowId);
+        writeKey(keyBuf, rowId);
 
         return keyBuf;
     }
 
-    private static void putShort(byte[] array, int off, short value) {
-        GridUnsafe.putShort(array, GridUnsafe.BYTE_ARR_OFF + off, value);
-    }
+    private void writeKey(ByteBuffer buffer, RowId rowId) {
+        assert buffer.order() == KEY_BYTE_ORDER;
+        assert rowId.partitionId() == partitionId : rowId;
 
-    private static void putInt(byte[] array, int off, int value) {
-        GridUnsafe.putInt(array, GridUnsafe.BYTE_ARR_OFF + off, value);
+        buffer.putInt(tableStorage.getTableId());
+        buffer.putShort((short) rowId.partitionId());
+
+        helper.putRowId(buffer, rowId);
     }
 
-    private static void validateTxId(byte[] valueBytes, UUID txId) {
-        long msb = bytesToLong(valueBytes);
-        long lsb = bytesToLong(valueBytes, Long.BYTES);
+    private static void validateTxId(ByteBuffer valueBytes, UUID txId) {
+        long msb = valueBytes.getLong();
+        long lsb = valueBytes.getLong();
+
+        valueBytes.rewind();
 
         if (txId.getMostSignificantBits() != msb || 
txId.getLeastSignificantBits() != lsb) {
             throw new TxIdMismatchException(txId, new UUID(msb, lsb));
@@ -1067,37 +1103,41 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      * @param valueHasTxData Whether the value has a transaction id prefix in 
it.
      * @return Binary row instance or {@code null} if value is a tombstone.
      */
-    private static @Nullable BinaryRow wrapValueIntoBinaryRow(byte[] 
valueBytes, boolean valueHasTxData) {
+    private static @Nullable BinaryRow wrapValueIntoBinaryRow(ByteBuffer 
valueBytes, boolean valueHasTxData) {
         if (isTombstone(valueBytes, valueHasTxData)) {
             return null;
         }
 
-        return valueHasTxData
-                ? new 
ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(TABLE_ROW_BYTE_ORDER))
-                : new ByteBufferRow(valueBytes);
+        assert valueBytes.order() == ByteOrder.BIG_ENDIAN;
+
+        if (valueHasTxData) {
+            valueBytes.position(VALUE_OFFSET);
+        }
+
+        return deserializeRow(valueBytes);
     }
 
     /**
-     * Converts raw byte array representation of the write-intent value into a 
read result adding newest commit timestamp if
-     * it is not {@code null}.
+     * Converts raw byte array representation of the write-intent value into a 
read result adding newest commit timestamp if it is not
+     * {@code null}.
      *
      * @param rowId ID of the corresponding row.
-     * @param valueBytes Value bytes as read from the storage.
+     * @param valueBuffer Value bytes as read from the storage.
      * @param newestCommitTs Commit timestamp of the most recent committed 
write of this value.
      * @return Read result instance.
      */
-    private static ReadResult wrapUncommittedValue(RowId rowId, byte[] 
valueBytes, @Nullable HybridTimestamp newestCommitTs) {
-        UUID txId = bytesToUuid(valueBytes, TX_ID_OFFSET);
-        int commitTableId = getInt(valueBytes, GridUnsafe.BYTE_ARR_OFF + 
TABLE_ID_OFFSET);
-        int commitPartitionId = getShort(valueBytes, GridUnsafe.BYTE_ARR_OFF + 
PARTITION_ID_OFFSET) & 0xFFFF;
+    private static ReadResult wrapUncommittedValue(RowId rowId, ByteBuffer 
valueBuffer, @Nullable HybridTimestamp newestCommitTs) {
+        assert valueBuffer.order() == ByteOrder.BIG_ENDIAN;
 
-        BinaryRow row;
+        UUID txId = new UUID(valueBuffer.getLong(), valueBuffer.getLong());
 
-        if (isTombstone(valueBytes, true)) {
-            row = null;
-        } else {
-            row = new 
ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(TABLE_ROW_BYTE_ORDER));
-        }
+        int commitTableId = valueBuffer.getInt();
+
+        int commitPartitionId = Short.toUnsignedInt(valueBuffer.getShort());
+
+        BinaryRow row = valueBuffer.remaining() == 0 ? null : 
deserializeRow(valueBuffer);
+
+        valueBuffer.rewind();
 
         return ReadResult.createFromWriteIntent(rowId, row, txId, 
commitTableId, commitPartitionId, newestCommitTs);
     }
@@ -1110,19 +1150,23 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      * @param rowCommitTimestamp Timestamp with which the row was committed.
      * @return Read result instance or {@code null} if value is a tombstone.
      */
-    private static ReadResult wrapCommittedValue(RowId rowId, byte[] 
valueBytes, HybridTimestamp rowCommitTimestamp) {
-        if (isTombstone(valueBytes, false)) {
+    private static ReadResult wrapCommittedValue(RowId rowId, ByteBuffer 
valueBytes, HybridTimestamp rowCommitTimestamp) {
+        if (valueBytes.remaining() == 0) {
             return ReadResult.empty(rowId);
         }
 
-        return ReadResult.createFromCommitted(rowId, new 
ByteBufferRow(valueBytes), rowCommitTimestamp);
+        return ReadResult.createFromCommitted(
+                rowId,
+                deserializeRow(valueBytes),
+                rowCommitTimestamp
+        );
     }
 
     /**
      * Returns {@code true} if value payload represents a tombstone.
      */
-    private static boolean isTombstone(byte[] valueBytes, boolean hasTxId) {
-        return valueBytes.length == (hasTxId ? VALUE_HEADER_SIZE : 0);
+    private static boolean isTombstone(ByteBuffer valueBytes, boolean hasTxId) 
{
+        return valueBytes.limit() == (hasTxId ? VALUE_HEADER_SIZE : 0);
     }
 
     private abstract class BasePartitionTimestampCursor implements 
PartitionTimestampCursor {
@@ -1218,11 +1262,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             currentRowId = null;
 
             // Prepare direct buffer slice to read keys from the iterator.
-            ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().rewind();
+            ByteBuffer currentKeyBuffer = DIRECT_KEY_BUFFER.get();
 
             while (true) {
-                currentKeyBuffer.rewind();
-
                 // At this point, seekKeyBuf should contain row id that's 
above the one we already scanned, but not greater than any
                 // other row id in partition. When we start, row id is filled 
with zeroes. Value during the iteration is described later
                 // in this code. Now let's describe what we'll find, assuming 
that iterator found something:
@@ -1246,11 +1288,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 }
 
                 // Read the actual key into a direct buffer.
-                int keyLength = it.key(currentKeyBuffer.limit(MAX_KEY_SIZE));
+                int keyLength = it.key(currentKeyBuffer.clear());
 
-                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+                currentKeyBuffer.position(0).limit(keyLength);
 
-                currentKeyBuffer.limit(ROW_PREFIX_SIZE);
+                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
                 RowId rowId = getRowId(currentKeyBuffer);
 
@@ -1291,18 +1333,14 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                     }
                 }
 
-                currentKeyBuffer.limit(keyLength);
-
                 assert valueBytes != null;
 
-                ReadResult readResult;
+                ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes);
 
-                if (!isWriteIntent) {
-                    // There is no write-intent, return latest committed row.
-                    readResult = wrapCommittedValue(rowId, valueBytes, 
readTimestampDesc(currentKeyBuffer));
-                } else {
-                    readResult = wrapUncommittedValue(rowId, valueBytes, 
nextCommitTimestamp);
-                }
+                ReadResult readResult = isWriteIntent
+                        ? wrapUncommittedValue(rowId, valueBuffer, 
nextCommitTimestamp)
+                        // There is no write-intent, return latest committed 
row.
+                        : wrapCommittedValue(rowId, valueBuffer, 
readTimestampDesc(currentKeyBuffer));
 
                 if (!readResult.isEmpty() || readResult.isWriteIntent()) {
                     next = readResult;
@@ -1336,7 +1374,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             currentRowId = null;
 
             // Prepare direct buffer slice to read keys from the iterator.
-            ByteBuffer directBuffer = MV_KEY_BUFFER.get().rewind();
+            ByteBuffer directBuffer = DIRECT_KEY_BUFFER.get();
 
             while (true) {
                 //TODO IGNITE-18201 Remove copying.
@@ -1347,7 +1385,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 }
 
                 // We need to figure out what current row id is.
-                it.key(directBuffer.rewind());
+                it.key(directBuffer.clear());
 
                 RowId rowId = getRowId(directBuffer);
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 8e9f93565a..dabd1d3d2d 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -17,7 +17,8 @@
 
 package org.apache.ignite.distributed;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -41,7 +42,6 @@ import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -442,14 +442,7 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
 
         subscriberAllDataAwaitLatch.await();
 
-        assertEquals(submittedItems.size(), retrievedItems.size());
-
-        List<byte[]> expItems = 
submittedItems.stream().map(BinaryRow::bytes).collect(Collectors.toList());
-        List<byte[]> gotItems = 
retrievedItems.stream().map(BinaryRow::bytes).collect(Collectors.toList());
-
-        for (int i = 0; i < expItems.size(); i++) {
-            assertArrayEquals(expItems.get(i), gotItems.get(i));
-        }
+        assertThat(submittedItems, is(retrievedItems));
 
         if (tx != null) {
             tx.commit();
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 83a62f14d9..f3c99a37ab 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -32,7 +32,6 @@ import static org.mockito.Mockito.when;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
@@ -228,20 +227,21 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                     MvPartitionStorage partitionStorage = 
mvPartitionStorages.get(storageIndex);
 
                     Map<ByteBuffer, RowId> primaryIndex = 
rowsToRowIds(partitionStorage);
-                    RowId rowId = 
primaryIndex.get(req0.binaryRow().byteBuffer());
+                    RowId rowId = primaryIndex.get(req0.binaryRowBytes());
+
                     BinaryRow row = partitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE).binaryRow();
 
                     return completedFuture(row);
                 }
 
                 // Non-null binary row if UPSERT, otherwise it's implied that 
request type is DELETE.
-                BinaryRow binaryRow = req0.requestType() == 
RequestType.RW_UPSERT ? req0.binaryRow() : null;
+                ByteBuffer binaryRow = req0.requestType() == 
RequestType.RW_UPSERT ? req0.binaryRowBytes() : null;
 
                 UpdateCommand cmd = msgFactory.updateCommand()
                         .txId(req0.transactionId())
                         .tablePartitionId(tablePartitionId(new 
TablePartitionId(1, 0)))
                         .rowUuid(new RowId(0).uuid())
-                        .rowBuffer(binaryRow == null ? null : 
binaryRow.byteBuffer())
+                        .rowBuffer(binaryRow)
                         .safeTimeLong(hybridClock.nowLong())
                         .build();
 
@@ -319,7 +319,7 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                 return false;
             }
 
-            return Arrays.equals(value.bytes(), 
read.binaryRow().tupleSlice().array());
+            return value.tupleSlice().equals(read.binaryRow().tupleSlice());
         };
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 879b357fd2..c28c31dc3c 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -49,6 +49,7 @@ import java.util.Set;
 import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -62,7 +63,6 @@ import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
@@ -196,9 +196,9 @@ public class ItColocationTest {
             RaftGroupService r = groupRafts.get(request.groupId());
 
             if (request instanceof ReadWriteMultiRowReplicaRequest) {
-                Map<UUID, ByteBuffer> rows = 
((ReadWriteMultiRowReplicaRequest) request).binaryRows()
+                Map<UUID, ByteBuffer> rows = 
((ReadWriteMultiRowReplicaRequest) request).binaryRowsBytes()
                         .stream()
-                        .collect(toMap(row -> 
TestTransactionIds.newTransactionId(), BinaryRow::byteBuffer));
+                        .collect(toMap(row -> 
TestTransactionIds.newTransactionId(), Function.identity()));
 
                 return r.run(MSG_FACTORY.updateAllCommand()
                                 
.tablePartitionId(MSG_FACTORY.tablePartitionIdMessage()
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b9d9c88c3c..84b23325d3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1835,7 +1835,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForInsert(searchRow, rowId0, txId)
                             .thenCompose(rowIdLock -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow.byteBuffer(), txId))
+                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow, txId))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
                                 // Release short term locks.
@@ -1857,7 +1857,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return lockFut
                             .thenCompose(rowIdLock -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow.byteBuffer(), txId))
+                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow, txId))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
                                 // Release short term locks.
@@ -1879,7 +1879,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return lockFut
                             .thenCompose(rowIdLock -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow.byteBuffer(), txId))
+                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow, txId))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
                                 // Release short term locks.
@@ -1897,7 +1897,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
                             .thenCompose(rowIdLock -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId.uuid(), searchRow.byteBuffer(), txId))
+                                    updateCommand(commitPartitionId, 
rowId.uuid(), searchRow, txId))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
                                 // Release short term locks.
@@ -1915,7 +1915,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
                             .thenCompose(rowLock -> applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
rowId.uuid(), searchRow.byteBuffer(), txId))
+                                    updateCommand(commitPartitionId, 
rowId.uuid(), searchRow, txId))
                                     .thenApply(ignored -> rowLock))
                             .thenApply(rowIdLock -> {
                                 // Release short term locks.
@@ -2077,7 +2077,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             }
 
                             return applyUpdateCommand(
-                                    updateCommand(commitPartitionId, 
validatedRowId.get1().uuid(), newRow.byteBuffer(), txId))
+                                    updateCommand(commitPartitionId, 
validatedRowId.get1().uuid(), newRow, txId))
                                     .thenApply(ignored -> validatedRowId)
                                     .thenApply(rowIdLock -> {
                                         // Release short term locks.
@@ -2331,19 +2331,19 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      *
      * @param tablePartId {@link TablePartitionId} object to construct {@link 
UpdateCommand} object with.
      * @param rowUuid Row UUID.
-     * @param rowBuf {@link ByteBuffer} representation of {@link BinaryRow}.
+     * @param row Row.
      * @param txId Transaction ID.
      * @return Constructed {@link UpdateCommand} object.
      */
-    private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID 
rowUuid, @Nullable ByteBuffer rowBuf, UUID txId) {
+    private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID 
rowUuid, @Nullable BinaryRow row, UUID txId) {
         UpdateCommandBuilder bldr = MSG_FACTORY.updateCommand()
                 .tablePartitionId(tablePartitionId(tablePartId))
                 .rowUuid(rowUuid)
                 .txId(txId)
                 .safeTimeLong(hybridClock.nowLong());
 
-        if (rowBuf != null) {
-            bldr.rowBuffer(rowBuf);
+        if (row != null) {
+            bldr.rowBuffer(row.byteBuffer());
         }
 
         return bldr.build();
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java
index 47fae65462..f0aa965be3 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java
@@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import java.util.Random;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.DecimalNativeType;
 import org.apache.ignite.internal.schema.NativeType;
@@ -112,7 +111,7 @@ public class ColocationHashCalculationTest {
             Tuple t = TableRow.tuple(r);
             t.set(rndCol.name(), SchemaTestUtils.generateRandomValue(rnd, 
rndCol.type()));
 
-            r = new Row(schema, new 
ByteBufferRow(marshaller.marshal(t).bytes()));
+            r = marshaller.marshal(t);
 
             assertEquals(colocationHash(r), r.colocationHash());
         }
@@ -129,7 +128,7 @@ public class ColocationHashCalculationTest {
             t.set(c.name(), SchemaTestUtils.generateRandomValue(rnd, 
c.type()));
         }
 
-        return new Row(schema, new 
ByteBufferRow(marshaller.marshal(t).bytes()));
+        return marshaller.marshal(t);
     }
 
     private int colocationHash(Row r) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/MutableRowTupleAdapterTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/MutableRowTupleAdapterTest.java
index 56bb7b6f85..d99c167236 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/MutableRowTupleAdapterTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/MutableRowTupleAdapterTest.java
@@ -53,7 +53,6 @@ import java.time.temporal.Temporal;
 import java.util.BitSet;
 import java.util.Random;
 import java.util.UUID;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.InvalidTypeException;
 import org.apache.ignite.internal.schema.NativeTypes;
@@ -222,7 +221,7 @@ public class MutableRowTupleAdapterTest {
 
         TupleMarshaller marshaller = new TupleMarshallerImpl(new 
DummySchemaManagerImpl(schema));
 
-        Row row = new Row(schema, new 
ByteBufferRow(marshaller.marshal(original).bytes()));
+        Row row = marshaller.marshal(original);
 
         Tuple key = TableRow.keyTuple(row);
         Tuple val = TableRow.valueTuple(row);
@@ -248,7 +247,7 @@ public class MutableRowTupleAdapterTest {
     public void testRowTupleMutability() throws TupleMarshallerException {
         TupleMarshaller marshaller = new TupleMarshallerImpl(new 
DummySchemaManagerImpl(schema));
 
-        Row row = new Row(schema, new 
ByteBufferRow(marshaller.marshal(Tuple.create().set("id", 1L).set("name", 
"Shirt")).bytes()));
+        Row row = marshaller.marshal(Tuple.create().set("id", 1L).set("name", 
"Shirt"));
 
         Tuple tuple = TableRow.tuple(row);
         Tuple key = TableRow.keyTuple(row);
@@ -275,7 +274,7 @@ public class MutableRowTupleAdapterTest {
     public void testKeyValueTupleMutability() throws TupleMarshallerException {
         TupleMarshaller marshaller = new TupleMarshallerImpl(new 
DummySchemaManagerImpl(schema));
 
-        Row row = new Row(schema, new 
ByteBufferRow(marshaller.marshal(Tuple.create().set("id", 1L).set("name", 
"Shirt")).bytes()));
+        Row row = marshaller.marshal(Tuple.create().set("id", 1L).set("name", 
"Shirt"));
 
         Tuple tuple = TableRow.tuple(row);
         Tuple key = TableRow.keyTuple(row);
@@ -304,7 +303,7 @@ public class MutableRowTupleAdapterTest {
     public void testRowTupleSchemaAwareness() throws TupleMarshallerException {
         TupleMarshaller marshaller = new TupleMarshallerImpl(new 
DummySchemaManagerImpl(schema));
 
-        Row row = new Row(schema, new 
ByteBufferRow(marshaller.marshal(Tuple.create().set("id", 1L).set("name", 
"Shirt")).bytes()));
+        Row row = marshaller.marshal(Tuple.create().set("id", 1L).set("name", 
"Shirt"));
 
         Tuple tuple = TableRow.tuple(row);
         Tuple key = TableRow.keyTuple(row);
@@ -327,7 +326,7 @@ public class MutableRowTupleAdapterTest {
     public void testKeyValueTupleSchemaAwareness() throws 
TupleMarshallerException {
         TupleMarshaller marshaller = new TupleMarshallerImpl(new 
DummySchemaManagerImpl(schema));
 
-        Row row = new Row(schema, new 
ByteBufferRow(marshaller.marshal(Tuple.create().set("id", 1L).set("name", 
"Shirt")).bytes()));
+        Row row = marshaller.marshal(Tuple.create().set("id", 1L).set("name", 
"Shirt"));
 
         Tuple tuple = TableRow.tuple(row);
         Tuple key = TableRow.keyTuple(row);
@@ -372,7 +371,7 @@ public class MutableRowTupleAdapterTest {
                 .set("valNumberCol", BigInteger.valueOf(rnd.nextLong()))
                 .set("valDecimalCol", BigDecimal.valueOf(rnd.nextLong(), 5));
 
-        Tuple rowTuple = TableRow.tuple(new Row(fullSchema, new 
ByteBufferRow(marshaller.marshal(tuple).bytes())));
+        Tuple rowTuple = TableRow.tuple(marshaller.marshal(tuple));
 
         assertEquals(tuple, rowTuple);
 
@@ -406,7 +405,7 @@ public class MutableRowTupleAdapterTest {
 
         TupleMarshaller marshaller = new TupleMarshallerImpl(new 
DummySchemaManagerImpl(fullSchema));
 
-        Row row = new Row(fullSchema, new 
ByteBufferRow(marshaller.marshal(tup1).bytes()));
+        Row row = marshaller.marshal(tup1);
 
         Tuple tup2 = deserializeTuple(serializeTuple(TableRow.tuple(row)));
 
@@ -454,11 +453,11 @@ public class MutableRowTupleAdapterTest {
         // Check tuples backed with Row.
         TupleMarshaller marshaller = new TupleMarshallerImpl(new 
DummySchemaManagerImpl(fullSchema));
 
-        Row row = new Row(fullSchema, new 
ByteBufferRow(marshaller.marshal(keyTuple, valTuple).bytes()));
+        Row row = marshaller.marshal(keyTuple, valTuple);
 
         Tuple rowKeyTuple = TableRow.keyTuple(row);
         Tuple rowValTuple = TableRow.valueTuple(row);
-        final Tuple rowTuple = TableRow.tuple(new Row(fullSchema, new 
ByteBufferRow(marshaller.marshal(tuple).bytes())));
+        final Tuple rowTuple = TableRow.tuple(marshaller.marshal(tuple));
 
         assertEquals(keyTuple, rowKeyTuple);
         assertEquals(rowKeyTuple, keyTuple);
@@ -537,7 +536,7 @@ public class MutableRowTupleAdapterTest {
 
         TupleMarshaller marshaller = new TupleMarshallerImpl(new 
DummySchemaManagerImpl(fullSchema));
 
-        Row row = new Row(fullSchema, new 
ByteBufferRow(marshaller.marshal(key1, val1).bytes()));
+        Row row = marshaller.marshal(key1, val1);
 
         Tuple key2 = deserializeTuple(serializeTuple(TableRow.keyTuple(row)));
         Tuple val2 = 
deserializeTuple(serializeTuple(TableRow.valueTuple(row)));
@@ -568,7 +567,7 @@ public class MutableRowTupleAdapterTest {
 
         TupleMarshaller marshaller = new TupleMarshallerImpl(new 
DummySchemaManagerImpl(schemaDescriptor));
 
-        Row row = new Row(schemaDescriptor, new 
ByteBufferRow(marshaller.marshal(tuple).bytes()));
+        Row row = marshaller.marshal(tuple);
 
         Tuple tuple1 = deserializeTuple(serializeTuple(TableRow.tuple(row)));
 
@@ -594,14 +593,14 @@ public class MutableRowTupleAdapterTest {
                 .set("string", "abcefghi")
                 .set("bytes", new byte[]{1, 2, 3, 4, 5});
 
-        assertThrowsWithCause(() -> marshaller.marshal(tuple1).bytes(), 
InvalidTypeException.class, "Column's type mismatch");
-        assertThrowsWithCause(() -> marshaller.marshal(tuple2).bytes(), 
InvalidTypeException.class, "Column's type mismatch");
+        assertThrowsWithCause(() -> marshaller.marshal(tuple1), 
InvalidTypeException.class, "Column's type mismatch");
+        assertThrowsWithCause(() -> marshaller.marshal(tuple2), 
InvalidTypeException.class, "Column's type mismatch");
 
         Tuple expected = Tuple.create().set("key", 1)
                 .set("string", "abc")
                 .set("bytes", new byte[]{1, 2, 3});
 
-        Row row = new Row(schemaDescriptor, new 
ByteBufferRow(marshaller.marshal(expected).bytes()));
+        Row row = marshaller.marshal(expected);
 
         assertEquals(expected, 
deserializeTuple(serializeTuple(TableRow.tuple(row))));
     }
@@ -619,7 +618,7 @@ public class MutableRowTupleAdapterTest {
 
         Tuple tuple1 = Tuple.create().set("key", 1).set("decimal", new 
BigDecimal("123456.7"));
 
-        assertThrowsWithCause(() -> marshaller.marshal(tuple1).bytes(), 
SchemaMismatchException.class,
+        assertThrowsWithCause(() -> marshaller.marshal(tuple1), 
SchemaMismatchException.class,
                 "Failed to set decimal value for column 'decimal' (max 
precision exceeds allocated precision)");
     }
 
@@ -637,7 +636,7 @@ public class MutableRowTupleAdapterTest {
         Tuple tuple = Tuple.create().set("key", 1).set("decimal", new 
BigDecimal("123.458"));
         Tuple expected = Tuple.create().set("key", 1).set("decimal", new 
BigDecimal("123.46")); // Rounded.
 
-        Row row = new Row(schemaDescriptor, new 
ByteBufferRow(marshaller.marshal(tuple).bytes()));
+        Row row = marshaller.marshal(tuple);
 
         assertEquals(expected, 
deserializeTuple(serializeTuple(TableRow.tuple(row))));
     }
@@ -700,7 +699,7 @@ public class MutableRowTupleAdapterTest {
 
             TupleMarshaller marshaller = new TupleMarshallerImpl(new 
DummySchemaManagerImpl(schema));
 
-            return TableRow.tuple(new Row(schema, new 
ByteBufferRow(marshaller.marshal(original).bytes())));
+            return TableRow.tuple(marshaller.marshal(original));
         } catch (TupleMarshallerException e) {
             return fail();
         }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 89ea3eb4ab..e9fbfe4e29 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -29,7 +29,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -1073,7 +1072,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
                 BinaryRow row = testMvPartitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE).binaryRow();
 
-                if (row != null && Arrays.equals(binaryRow.bytes(), 
row.bytes())) {
+                if (rowEquals(binaryRow, row)) {
                     found = true;
                 }
             }
@@ -1084,7 +1083,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
             BinaryRow row = testMvPartitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE).binaryRow();
 
-            assertTrue(row == null || !Arrays.equals(row.bytes(), 
binaryRow.bytes()));
+            assertTrue(row == null || !rowEquals(row, binaryRow));
         }
     }
 
@@ -1230,7 +1229,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             if (expected == null) {
                 assertNull(res);
             } else {
-                assertArrayEquals(expected.bytes(), res.bytes());
+                assertTrue(rowEquals(expected, res));
             }
         }
 
@@ -1655,6 +1654,11 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         }
     }
 
+    private static boolean rowEquals(BinaryRow row1, BinaryRow row2) {
+        return row1.schemaVersion() == row2.schemaVersion()
+                && row1.hasValue() == row2.hasValue()
+                && row1.tupleSlice().equals(row2.tupleSlice());
+    }
 
     /**
      * Test pojo key.
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java
index a658999c56..8223090ebd 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.table.type;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -273,10 +272,7 @@ public class NumericTypesSerializerTest {
         Row firstRow = marshaller.marshal(firstTup);
         Row secondRow = marshaller.marshal(secondTup);
 
-        byte[] firstRowInBytes = firstRow.bytes();
-        byte[] secondRowInBytes = secondRow.bytes();
-
-        assertArrayEquals(firstRowInBytes, secondRowInBytes);
+        assertEquals(firstRow, secondRow);
     }
 
     @NotNull

Reply via email to