This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 00181c783f IGNITE-19810 Remove excessive BinaryRow buffer copying (#2295) 00181c783f is described below commit 00181c783f22e262a5fc0df488c4f64d0c9c75ee Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Tue Jul 11 11:06:44 2023 +0300 IGNITE-19810 Remove excessive BinaryRow buffer copying (#2295) --- .idea/inspectionProfiles/Project_Default.xml | 1 - .../internal/binarytuple/BinaryTupleCommon.java | 6 + .../org/apache/ignite/internal/util/ByteUtils.java | 30 --- .../pagememory/util/PartitionlessLinks.java | 2 +- .../ignite/internal/table/ItRoReadsTest.java | 70 +++-- .../apache/ignite/internal/schema/BinaryRow.java | 18 +- .../ignite/internal/schema/BinaryRowImpl.java | 96 +++++++ .../ignite/internal/schema/ByteBufferRow.java | 25 +- .../org/apache/ignite/internal/schema/row/Row.java | 28 +- .../ignite/internal/schema/row/RowAssembler.java | 18 +- .../schema/marshaller/KvMarshallerTest.java | 6 +- .../storage/AbstractMvPartitionStorageTest.java | 2 +- .../storage/AbstractMvTableStorageTest.java | 12 +- .../internal/storage/BaseMvStoragesTest.java | 5 +- .../pagememory/mv/AbortWriteInvokeClosure.java | 2 +- .../mv/AbstractPageMemoryMvPartitionStorage.java | 28 +- .../mv/AddWriteCommittedInvokeClosure.java | 6 +- .../pagememory/mv/AddWriteInvokeClosure.java | 8 +- .../storage/pagememory/mv/FindRowVersion.java | 13 +- .../storage/pagememory/mv/ReadRowVersion.java | 13 +- .../internal/storage/pagememory/mv/RowVersion.java | 38 ++- .../storage/pagememory/mv/ScanVersionsCursor.java | 10 +- .../storage/pagememory/mv/io/RowVersionDataIo.java | 61 +++-- .../internal/storage/rocksdb/GarbageCollector.java | 5 +- .../storage/rocksdb/PartitionDataHelper.java | 14 + .../storage/rocksdb/RocksDbMvPartitionStorage.java | 292 ++++++++++++--------- .../ItAbstractInternalTableScanTest.java | 13 +- .../ignite/distributed/ItTablePersistenceTest.java | 10 +- .../ignite/internal/table/ItColocationTest.java | 6 +- .../replicator/PartitionReplicaListener.java | 20 +- .../table/ColocationHashCalculationTest.java | 5 +- .../internal/table/MutableRowTupleAdapterTest.java | 35 ++- .../replication/PartitionReplicaListenerTest.java | 12 +- .../table/type/NumericTypesSerializerTest.java | 6 +- 34 files changed, 514 insertions(+), 402 deletions(-) diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index dc1ec81c11..d730085aba 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -1148,7 +1148,6 @@ <inspection_tool class="UseVirtualFileEquals" enabled="false" level="WARNING" enabled_by_default="false" /> <inspection_tool class="VarargParameter" enabled="false" level="INFORMATION" enabled_by_default="false" /> <inspection_tool class="VariablePatternShadow" enabled="true" level="WARNING" enabled_by_default="true" /> - <inspection_tool class="VolatileArrayField" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="WaitCalledOnCondition" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="WaitNotInLoop" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="WaitNotInSynchronizedContext" enabled="false" level="WARNING" enabled_by_default="false" /> diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java index 519737e42d..a4e09ca2e9 100644 --- a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java +++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java @@ -50,6 +50,12 @@ public class BinaryTupleCommon { */ public static final int EQUALITY_FLAG = 1 << 4; + /** + * Flag used when a Binary Tuple represents a Table Row. If it is set, then this Row contains both a Primary Key columns and Value + * columns. + */ + public static final int ROW_HAS_VALUE_FLAG = 1 << 5; + /** * Calculates flags for a given size of variable-length area. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java index ba540654d1..dba531df0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java @@ -22,7 +22,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.util.UUID; import org.apache.ignite.lang.IgniteInternalException; /** @@ -153,35 +152,6 @@ public class ByteUtils { return bytes; } - /** - * Converts a {@link UUID} value to a byte array in Big Endian order and stores it in the specified byte array. - * - * @param uuid UUID. - * @param bytes Bytes array to write result to. - * @param off Offset in the target array to write result to. - * @return Number of bytes overwritten in {@code bytes} array. - */ - public static byte[] putUuidToBytes(UUID uuid, byte[] bytes, int off) { - putLongToBytes(uuid.getMostSignificantBits(), bytes, off); - putLongToBytes(uuid.getLeastSignificantBits(), bytes, off + Long.BYTES); - - return bytes; - } - - /** - * Constructs {@link UUID} from byte array in Big Endian order. - * - * @param bytes Array of bytes. - * @param off Offset in {@code bytes} array. - * @return UUID value. - */ - public static UUID bytesToUuid(byte[] bytes, int off) { - long msb = bytesToLong(bytes, off); - long lsb = bytesToLong(bytes, off + Long.BYTES); - - return new UUID(msb, lsb); - } - /** * Serializes an object to byte array using native java serialization mechanism. * diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PartitionlessLinks.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PartitionlessLinks.java index 9383021234..6e057fc0a3 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PartitionlessLinks.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PartitionlessLinks.java @@ -74,7 +74,7 @@ public class PartitionlessLinks { * @param link Link or page ID. * @return Number of bytes written (equal to {@link #PARTITIONLESS_LINK_SIZE_BYTES}). */ - public static long writePartitionless(long addr, long link) { + public static int writePartitionless(long addr, long link) { putShort(addr, 0, (short) tag(link)); putInt(addr + Short.BYTES, 0, pageIndex(link)); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java index 0fcb2c7da7..ff2a3b83f1 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java @@ -25,24 +25,20 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; -import java.util.stream.Collectors; import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; import org.apache.ignite.InitParameters; @@ -181,7 +177,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { res = internalTable.get(keyValueRow, node.clock().now(), node.node()).get(); - assertEquals(res.byteBuffer(), keyValueRow.byteBuffer()); + assertRowEquals(res, keyValueRow); } @Test @@ -213,7 +209,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { BinaryRow res = internalTable.get(keyRow, node.clock().now(), node.node()).get(); - assertEquals(res.byteBuffer(), keyValueRow2.byteBuffer()); + assertRowEquals(res, keyValueRow2); } @Test @@ -247,9 +243,9 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { CountDownLatch latch = new CountDownLatch(1); - List<ByteBuffer> list = new ArrayList<>(); + List<BinaryRow> list = new ArrayList<>(); - res.subscribe(new Subscriber<BinaryRow>() { + res.subscribe(new Subscriber<>() { @Override public void onSubscribe(Subscription subscription) { subscription.request(100); @@ -257,7 +253,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { @Override public void onNext(BinaryRow item) { - list.add(item.byteBuffer()); + list.add(item); } @Override @@ -274,7 +270,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { assertEquals(1, list.size()); - assertEquals(list.get(0), keyValueRow2.byteBuffer()); + assertRowEquals(list.get(0), keyValueRow2); } @Test @@ -302,13 +298,13 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { BinaryRow res = internalTable.get(keyValueRow, node.clock().now(), node.node()).get(); - assertEquals(res.byteBuffer(), keyValueRow.byteBuffer()); + assertRowEquals(res, keyValueRow); tx2.commit(); res = internalTable.get(keyValueRow, node.clock().now(), node.node()).get(); - assertEquals(res.byteBuffer(), keyValueRow2.byteBuffer()); + assertRowEquals(res, keyValueRow2); } @Test @@ -321,13 +317,13 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { Row keyValueRow2 = createKeyValueRow(2, 2, "some string row" + 2); Row keyValueRow3 = createKeyValueRow(3, 3, "some string row" + 3); - Set<BinaryRowEx> rowsToSearch = Set.of(keyValueRow1, keyValueRow2, keyValueRow3); + List<BinaryRowEx> rowsToSearch = List.of(keyValueRow1, keyValueRow2, keyValueRow3); KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView(); - Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); + List<BinaryRow> res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); - assertEquals(res.size(), 3); + assertEquals(3, res.size()); node.transactions().runInTransaction(txs -> { for (int i = 0; i < 15; i++) { @@ -337,13 +333,11 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); - assertEquals(res.size(), 3); - - Set<ByteBuffer> resultKeys = res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet()); + assertEquals(3, res.size()); - assertTrue(resultKeys.contains(keyValueRow1.byteBuffer())); - assertTrue(resultKeys.contains(keyValueRow2.byteBuffer())); - assertTrue(resultKeys.contains(keyValueRow3.byteBuffer())); + for (int i = 0; i < 3; i++) { + assertRowEquals(res.get(i), rowsToSearch.get(i)); + } } @Test @@ -356,25 +350,23 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { Row keyValueRow2 = createKeyValueRow(2, 2, "some string row" + 2); Row keyValueRow3 = createKeyValueRow(3, 3, "some string row" + 3); - Set<BinaryRowEx> rowsToSearch = Set.of(keyValueRow1, keyValueRow2, keyValueRow3); + List<BinaryRowEx> rowsToSearch = List.of(keyValueRow1, keyValueRow2, keyValueRow3); KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView(); - Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); + List<BinaryRow> res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); - assertEquals(res.size(), 3); + assertEquals(3, res.size()); populateData(node(), keyValueView, false); res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); - assertEquals(res.size(), 3); - - Set<ByteBuffer> resultKeys = res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet()); + assertEquals(3, res.size()); - assertTrue(resultKeys.contains(keyValueRow1.byteBuffer())); - assertTrue(resultKeys.contains(keyValueRow2.byteBuffer())); - assertTrue(resultKeys.contains(keyValueRow3.byteBuffer())); + for (int i = 0; i < 3; i++) { + assertRowEquals(res.get(i), rowsToSearch.get(i)); + } node.transactions().runInTransaction(txs -> { for (int i = 0; i < 15; i++) { @@ -388,13 +380,11 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get(); - assertEquals(res.size(), 3); - - resultKeys = res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet()); + assertEquals(3, res.size()); - assertTrue(resultKeys.contains(newKeyValueRow1.byteBuffer())); - assertTrue(resultKeys.contains(newKeyValueRow2.byteBuffer())); - assertTrue(resultKeys.contains(newKeyValueRow3.byteBuffer())); + assertRowEquals(res.get(0), newKeyValueRow1); + assertRowEquals(res.get(1), newKeyValueRow2); + assertRowEquals(res.get(2), newKeyValueRow3); } @Test @@ -554,4 +544,10 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { protected static IgniteImpl node() { return (IgniteImpl) NODE; } + + private static void assertRowEquals(BinaryRow row1, BinaryRow row2) { + assertThat(row1.schemaVersion(), is(row2.schemaVersion())); + assertThat(row1.hasValue(), is(row2.hasValue())); + assertThat(row1.tupleSlice(), is(row2.tupleSlice())); + } } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java index 8048e36e5f..a519a48a23 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java @@ -18,25 +18,22 @@ package org.apache.ignite.internal.schema; import java.nio.ByteBuffer; +import java.nio.ByteOrder; /** * Binary row interface. Data layout is described in packages' {@code README.md}. */ public interface BinaryRow { + ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN; + /** Size of schema version field. */ int SCHEMA_VERSION_FLD_LEN = Short.BYTES; /** Row schema version field offset. */ int SCHEMA_VERSION_OFFSET = 0; - /** Size of 'has value' field. */ - int HAS_VALUE_FLD_LEN = Byte.BYTES; - - /** Row 'has value' field offset. */ - int HAS_VALUE_OFFSET = SCHEMA_VERSION_OFFSET + SCHEMA_VERSION_FLD_LEN; - /** Row binary tuple field offset. */ - int TUPLE_OFFSET = HAS_VALUE_OFFSET + HAS_VALUE_FLD_LEN; + int TUPLE_OFFSET = SCHEMA_VERSION_OFFSET + SCHEMA_VERSION_FLD_LEN; /** Get row schema version. */ int schemaVersion(); @@ -44,12 +41,13 @@ public interface BinaryRow { /** Get has value flag: {@code true} if row has non-null value, {@code false} otherwise. */ boolean hasValue(); + /** Length of the {@link #tupleSlice}. */ + int tupleSliceLength(); + /** Get ByteBuffer slice representing the binary tuple. */ ByteBuffer tupleSlice(); - /** Get byte array of the row. */ - byte[] bytes(); - /** Returns the representation of this row as a Byte Buffer. */ + // TODO: remove this method, see https://issues.apache.org/jira/browse/IGNITE-19937 ByteBuffer byteBuffer(); } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java new file mode 100644 index 0000000000..c7d41c452b --- /dev/null +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.schema; + +import static org.apache.ignite.internal.binarytuple.BinaryTupleCommon.ROW_HAS_VALUE_FLAG; + +import java.nio.ByteBuffer; + +/** + * Binary row implementation that stores the schema version as a separate field. + */ +public class BinaryRowImpl implements BinaryRow { + private final int schemaVersion; + + private final ByteBuffer binaryTuple; + + /** + * Constructor. + * + * @param schemaVersion Schema version. + * @param binaryTuple Binary tuple. + */ + public BinaryRowImpl(int schemaVersion, ByteBuffer binaryTuple) { + this.schemaVersion = schemaVersion; + this.binaryTuple = binaryTuple; + } + + @Override + public int schemaVersion() { + return schemaVersion; + } + + @Override + public boolean hasValue() { + return (binaryTuple.get(0) & ROW_HAS_VALUE_FLAG) != 0; + } + + @Override + public ByteBuffer tupleSlice() { + return binaryTuple.duplicate().order(ORDER); + } + + @Override + public ByteBuffer byteBuffer() { + return ByteBuffer.allocate(tupleSliceLength() + Short.BYTES) + .order(ORDER) + .putShort((short) schemaVersion()) + .put(tupleSlice()) + .rewind(); + } + + @Override + public int tupleSliceLength() { + return binaryTuple.remaining(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + BinaryRowImpl binaryRow = (BinaryRowImpl) o; + + if (schemaVersion != binaryRow.schemaVersion) { + return false; + } + + return binaryTuple.equals(binaryRow.binaryTuple); + } + + @Override + public int hashCode() { + int result = schemaVersion; + result = 31 * result + binaryTuple.hashCode(); + return result; + } +} diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java index d732b3aa15..2eb9ab6594 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java @@ -17,15 +17,15 @@ package org.apache.ignite.internal.schema; +import static org.apache.ignite.internal.binarytuple.BinaryTupleCommon.ROW_HAS_VALUE_FLAG; + import java.nio.ByteBuffer; -import java.nio.ByteOrder; /** * Heap byte buffer-based row. */ +// TODO: remove this class, see https://issues.apache.org/jira/browse/IGNITE-19937 public class ByteBufferRow implements BinaryRow { - public static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN; - /** Row buffer. */ private final ByteBuffer buf; @@ -58,9 +58,7 @@ public class ByteBufferRow implements BinaryRow { @Override public boolean hasValue() { - byte schemaVer = buf.get(HAS_VALUE_OFFSET); - - return schemaVer == 1; + return (buf.get(TUPLE_OFFSET) & ROW_HAS_VALUE_FLAG) != 0; } /** {@inheritDoc} */ @@ -75,19 +73,12 @@ public class ByteBufferRow implements BinaryRow { /** {@inheritDoc} */ @Override - public byte[] bytes() { - // TODO IGNITE-15934 avoid copy. - byte[] tmp = new byte[buf.limit()]; - - buf.get(tmp); - buf.rewind(); - - return tmp; + public ByteBuffer byteBuffer() { + return buf.duplicate().order(ORDER); } - /** {@inheritDoc} */ @Override - public ByteBuffer byteBuffer() { - return buf.slice().order(ORDER); + public int tupleSliceLength() { + return buf.remaining() - TUPLE_OFFSET; } } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java index 6b006a9f50..e3030422af 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java @@ -113,15 +113,14 @@ public class Row extends BinaryTupleReader implements BinaryRowEx, SchemaAware, return row.tupleSlice(); } - /** {@inheritDoc} */ @Override - public byte[] bytes() { - return row.bytes(); + public ByteBuffer byteBuffer() { + return row.byteBuffer(); } @Override - public ByteBuffer byteBuffer() { - return row.byteBuffer(); + public int tupleSliceLength() { + return row.tupleSliceLength(); } /** {@inheritDoc} */ @@ -146,4 +145,23 @@ public class Row extends BinaryTupleReader implements BinaryRowEx, SchemaAware, public BinaryTuple binaryTuple() { return new BinaryTuple(binaryTupleSchema.elementCount(), row.tupleSlice()); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Row row1 = (Row) o; + + return row.equals(row1.row); + } + + @Override + public int hashCode() { + return row.hashCode(); + } } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java index 3598c24aac..6efccfed08 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java @@ -17,9 +17,7 @@ package org.apache.ignite.internal.schema.row; -import static org.apache.ignite.internal.schema.BinaryRow.HAS_VALUE_FLD_LEN; -import static org.apache.ignite.internal.schema.BinaryRow.SCHEMA_VERSION_FLD_LEN; -import static org.apache.ignite.internal.schema.ByteBufferRow.ORDER; +import static org.apache.ignite.internal.binarytuple.BinaryTupleCommon.ROW_HAS_VALUE_FLAG; import java.math.BigDecimal; import java.math.BigInteger; @@ -34,8 +32,8 @@ import java.util.UUID; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.schema.AssemblyException; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowImpl; import org.apache.ignite.internal.schema.BitmaskNativeType; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.Columns; import org.apache.ignite.internal.schema.DecimalNativeType; @@ -653,13 +651,13 @@ public class RowAssembler { * @return Created {@link BinaryRow}. */ public static BinaryRow build(ByteBuffer binTupleBuffer, int schemaVersion, boolean hasValue) { - ByteBuffer buffer = ByteBuffer.allocate(SCHEMA_VERSION_FLD_LEN + HAS_VALUE_FLD_LEN + binTupleBuffer.limit()).order(ORDER); - buffer.putShort((short) schemaVersion); - buffer.put(hasValue ? (byte) 1 : 0); - buffer.put(binTupleBuffer); - buffer.position(0); + if (hasValue) { + byte flags = binTupleBuffer.get(0); - return new ByteBufferRow(buffer); + binTupleBuffer.put(0, (byte) (flags | ROW_HAS_VALUE_FLAG)); + } + + return new BinaryRowImpl(schemaVersion, binTupleBuffer); } /** diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java index a0d9e8aff1..cd7d79bd29 100644 --- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java +++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java @@ -499,9 +499,9 @@ public class KvMarshallerTest { BinaryRow row4 = marshaller4.marshal(1L, new TestPojoWrapper(serializedPojo)); // Verify all rows are equivalent. - assertArrayEquals(row.bytes(), row2.bytes()); - assertArrayEquals(row.bytes(), row3.bytes()); - assertArrayEquals(row.bytes(), row4.bytes()); + assertEquals(row, row2); + assertEquals(row, row3); + assertEquals(row, row4); // Check key. assertEquals(1L, marshaller1.unmarshalKey(new Row(schema, row))); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java index 2e41e9aefd..c9d7659512 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java @@ -83,7 +83,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor } /** - * Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID, int)}. + * Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID, int, int)}. */ @Test public void testAddWrite() { diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java index 6f0f07a14d..7a3e5ebefd 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java @@ -28,7 +28,6 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -1030,13 +1029,14 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows ) { for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) { - List<byte[]> allVersions = mvPartitionStorage.runConsistently(locker -> { + List<BinaryRow> allVersions = mvPartitionStorage.runConsistently(locker -> { locker.lock(row.get1()); - return toListOfByteArrays(mvPartitionStorage.scanVersions(row.get1())); + return toListOfBinaryRows(mvPartitionStorage.scanVersions(row.get1())); }); - assertThat(allVersions, containsInAnyOrder(row.get2().bytes())); + assertThat(allVersions, hasSize(1)); + assertRowMatches(allVersions.get(0), row.get2()); IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), row.get2(), row.get1()); IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), row.get2(), row.get1()); @@ -1056,9 +1056,9 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { assertEquals(expLastAppliedTerm, storage.lastAppliedTerm()); } - private static List<byte[]> toListOfByteArrays(Cursor<ReadResult> cursor) { + private static List<BinaryRow> toListOfBinaryRows(Cursor<ReadResult> cursor) { try (cursor) { - return cursor.stream().map(ReadResult::binaryRow).map(BinaryRow::bytes).collect(toList()); + return cursor.stream().map(ReadResult::binaryRow).collect(toList()); } } diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java index 164d1fbbdc..9a751332e5 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.storage; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -146,7 +145,9 @@ public abstract class BaseMvStoragesTest { protected final void assertRowMatches(@Nullable BinaryRow rowUnderQuestion, BinaryRow expectedRow) { assertThat(rowUnderQuestion, is(notNullValue())); - assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes()))); + assertThat(rowUnderQuestion.schemaVersion(), is(expectedRow.schemaVersion())); + assertThat(rowUnderQuestion.hasValue(), is(expectedRow.hasValue())); + assertThat(rowUnderQuestion.tupleSlice(), is(expectedRow.tupleSlice())); } /** diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java index 1cb8845d4a..1b466baa2b 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java @@ -81,7 +81,7 @@ class AbortWriteInvokeClosure implements InvokeClosure<VersionChain> { operationType = OperationType.REMOVE; } - previousUncommittedRowVersion = storage.rowVersionToBinaryRow(latestVersion); + previousUncommittedRowVersion = latestVersion.value(); } @Override diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java index c4d43d137b..ca43e10670 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java @@ -41,7 +41,6 @@ import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowMapClosure; import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure; import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.PartitionTimestampCursor; import org.apache.ignite.internal.storage.ReadResult; @@ -91,8 +90,6 @@ import org.jetbrains.annotations.Nullable; * </ul> */ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitionStorage { - private static final byte[] TOMBSTONE_PAYLOAD = new byte[0]; - static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE = timestamp -> true; static final Predicate<HybridTimestamp> DONT_LOAD_VALUE = timestamp -> false; @@ -365,9 +362,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio return writeIntentToResult(versionChain, rowVersion, newestCommitTs); } else { - BinaryRow row = rowVersionToBinaryRow(rowVersion); - - return ReadResult.createFromCommitted(versionChain.rowId(), row, rowVersion.timestamp()); + return ReadResult.createFromCommitted(versionChain.rowId(), rowVersion.value(), rowVersion.timestamp()); } } @@ -401,14 +396,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio return findRowVersion.getResult(); } - @Nullable BinaryRow rowVersionToBinaryRow(RowVersion rowVersion) { - if (rowVersion.isTombstone()) { - return null; - } - - return new ByteBufferRow(rowVersion.value()); - } - /** * Finds row version by timestamp. See {@link MvPartitionStorage#read(RowId, HybridTimestamp)} for details on the API. * @@ -480,7 +467,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio if (curCommit.isTombstone()) { row = null; } else { - row = new ByteBufferRow(curCommit.value()); + row = curCommit.value(); } return ReadResult.createFromCommitted(chain.rowId(), row, curCommit.timestamp()); @@ -503,11 +490,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio int commitTableId = chain.commitTableId(); int commitPartitionId = chain.commitPartitionId(); - BinaryRow row = rowVersionToBinaryRow(rowVersion); - return ReadResult.createFromWriteIntent( chain.rowId(), - row, + rowVersion.value(), transactionId, commitTableId, commitPartitionId, @@ -523,11 +508,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio } } - static byte[] rowBytes(@Nullable BinaryRow row) { - // TODO IGNITE-16913 Add proper way to write row bytes into array without allocations. - return row == null ? TOMBSTONE_PAYLOAD : row.bytes(); - } - @Override public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int commitTableId, int commitPartitionId) throws TxIdMismatchException, StorageException { @@ -946,7 +926,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio RowVersion removedRowVersion = removeWriteOnGc(rowId, rowTimestamp, gcRowVersion.getLink()); - return rowVersionToBinaryRow(removedRowVersion); + return removedRowVersion.value(); } private RowVersion removeWriteOnGc(RowId rowId, HybridTimestamp rowTimestamp, long rowLink) { diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java index 02876a9ce1..40d44e9364 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java @@ -19,9 +19,7 @@ package org.apache.ignite.internal.storage.pagememory.mv; import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; import static org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.DONT_LOAD_VALUE; -import static org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes; -import java.nio.ByteBuffer; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.pagememory.tree.BplusTree; import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure; @@ -127,9 +125,7 @@ class AddWriteCommittedInvokeClosure implements InvokeClosure<VersionChain> { } private RowVersion insertCommittedRowVersion(@Nullable BinaryRow row, HybridTimestamp commitTimestamp, long nextPartitionlessLink) { - byte[] rowBytes = rowBytes(row); - - RowVersion rowVersion = new RowVersion(storage.partitionId, commitTimestamp, nextPartitionlessLink, ByteBuffer.wrap(rowBytes)); + RowVersion rowVersion = new RowVersion(storage.partitionId, commitTimestamp, nextPartitionlessLink, row); storage.insertRowVersion(rowVersion); diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java index e2fd214319..2058bac162 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java @@ -19,9 +19,7 @@ package org.apache.ignite.internal.storage.pagememory.mv; import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; import static org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE; -import static org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes; -import java.nio.ByteBuffer; import java.util.UUID; import org.apache.ignite.internal.pagememory.tree.BplusTree; import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure; @@ -96,7 +94,7 @@ class AddWriteInvokeClosure implements InvokeClosure<VersionChain> { if (oldRow.isUncommitted()) { RowVersion currentVersion = storage.readRowVersion(oldRow.headLink(), ALWAYS_LOAD_VALUE); - previousUncommittedRowVersion = storage.rowVersionToBinaryRow(currentVersion); + previousUncommittedRowVersion = currentVersion.value(); // As we replace an uncommitted version with new one, we need to remove old uncommitted version. toRemove = currentVersion; @@ -125,9 +123,7 @@ class AddWriteInvokeClosure implements InvokeClosure<VersionChain> { } private RowVersion insertRowVersion(@Nullable BinaryRow row, long nextPartitionlessLink) { - byte[] rowBytes = rowBytes(row); - - RowVersion rowVersion = new RowVersion(storage.partitionId, nextPartitionlessLink, ByteBuffer.wrap(rowBytes)); + RowVersion rowVersion = new RowVersion(storage.partitionId, nextPartitionlessLink, row); storage.insertRowVersion(rowVersion); diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java index b42e3e9417..03a8833cab 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java @@ -27,7 +27,9 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal; import org.apache.ignite.internal.pagememory.io.DataPagePayload; import org.apache.ignite.internal.pagememory.util.PageUtils; -import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowImpl; +import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter; import org.jetbrains.annotations.Nullable; @@ -51,6 +53,8 @@ class FindRowVersion implements PageMemoryTraversal<RowVersionFilter> { private int rowValueSize; + private int schemaVersion; + private @Nullable RowVersion result; FindRowVersion(int partitionId, boolean loadValueBytes) { @@ -75,6 +79,7 @@ class FindRowVersion implements PageMemoryTraversal<RowVersionFilter> { rowLink = link; rowTimestamp = HybridTimestamps.readTimestamp(pageAddr, payload.offset() + RowVersion.TIMESTAMP_OFFSET); rowNextLink = nextLink; + schemaVersion = Short.toUnsignedInt(PageUtils.getShort(pageAddr, payload.offset() + RowVersion.SCHEMA_VERSION_OFFSET)); if (loadValueBytes) { return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null); @@ -96,9 +101,11 @@ class FindRowVersion implements PageMemoryTraversal<RowVersionFilter> { byte[] valueBytes = readRowVersionValue.result(); - ByteBuffer value = ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER); + BinaryRow row = valueBytes.length == 0 + ? null + : new BinaryRowImpl(schemaVersion, ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER)); - result = new RowVersion(partitionId, rowLink, rowTimestamp, rowNextLink, value); + result = new RowVersion(partitionId, rowLink, rowTimestamp, rowNextLink, row); } else { result = new RowVersion(partitionId, rowLink, rowTimestamp, rowNextLink, rowValueSize); } diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java index ef44a3a70c..a6af6ef551 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java @@ -26,7 +26,9 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal; import org.apache.ignite.internal.pagememory.io.DataPagePayload; import org.apache.ignite.internal.pagememory.util.PageUtils; -import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowImpl; +import org.apache.ignite.internal.schema.BinaryTuple; import org.jetbrains.annotations.Nullable; /** @@ -45,6 +47,8 @@ class ReadRowVersion implements PageMemoryTraversal<Predicate<HybridTimestamp>> private long nextLink; + private int schemaVersion; + private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue(); ReadRowVersion(int partitionId) { @@ -67,6 +71,7 @@ class ReadRowVersion implements PageMemoryTraversal<Predicate<HybridTimestamp>> timestamp = HybridTimestamps.readTimestamp(pageAddr, payload.offset() + RowVersion.TIMESTAMP_OFFSET); nextLink = readPartitionless(partitionId, pageAddr, payload.offset() + RowVersion.NEXT_LINK_OFFSET); + schemaVersion = Short.toUnsignedInt(PageUtils.getShort(pageAddr, payload.offset() + RowVersion.SCHEMA_VERSION_OFFSET)); if (!loadValue.test(timestamp)) { int valueSize = PageUtils.getInt(pageAddr, payload.offset() + RowVersion.VALUE_SIZE_OFFSET); @@ -90,9 +95,11 @@ class ReadRowVersion implements PageMemoryTraversal<Predicate<HybridTimestamp>> byte[] valueBytes = readRowVersionValue.result(); - ByteBuffer value = ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER); + BinaryRow row = valueBytes.length == 0 + ? null + : new BinaryRowImpl(schemaVersion, ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER)); - result = new RowVersion(partitionIdFromLink(firstFragmentLink), firstFragmentLink, timestamp, nextLink, value); + result = new RowVersion(partitionIdFromLink(firstFragmentLink), firstFragmentLink, timestamp, nextLink, row); } RowVersion result() { diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java index a006e8d57c..e849393ca9 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java @@ -20,12 +20,12 @@ package org.apache.ignite.internal.storage.pagememory.mv; import static org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE; import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; -import java.nio.ByteBuffer; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.pagememory.Storable; import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo; import org.apache.ignite.internal.pagememory.io.IoVersions; import org.apache.ignite.internal.pagememory.util.PartitionlessLinks; +import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo; import org.apache.ignite.internal.tostring.IgniteToStringExclude; import org.apache.ignite.internal.tostring.S; @@ -37,11 +37,13 @@ import org.jetbrains.annotations.Nullable; public final class RowVersion implements Storable { private static final int NEXT_LINK_STORE_SIZE_BYTES = PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES; private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES; + private static final int SCHEMA_VERSION_SIZE_BYTES = Short.BYTES; public static final int TIMESTAMP_OFFSET = 0; public static final int NEXT_LINK_OFFSET = TIMESTAMP_OFFSET + HYBRID_TIMESTAMP_SIZE; public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET + NEXT_LINK_STORE_SIZE_BYTES; - public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET + VALUE_SIZE_STORE_SIZE_BYTES; + public static final int SCHEMA_VERSION_OFFSET = VALUE_SIZE_OFFSET + VALUE_SIZE_STORE_SIZE_BYTES; + public static final int VALUE_OFFSET = SCHEMA_VERSION_OFFSET + SCHEMA_VERSION_SIZE_BYTES; private final int partitionId; @@ -54,32 +56,32 @@ public final class RowVersion implements Storable { private final int valueSize; @IgniteToStringExclude - private final @Nullable ByteBuffer value; + private final @Nullable BinaryRow value; /** * Constructor. */ - public RowVersion(int partitionId, long nextLink, ByteBuffer value) { + public RowVersion(int partitionId, long nextLink, @Nullable BinaryRow value) { this(partitionId, 0, null, nextLink, value); } /** * Constructor. */ - public RowVersion(int partitionId, HybridTimestamp commitTimestamp, long nextLink, ByteBuffer value) { + public RowVersion(int partitionId, HybridTimestamp commitTimestamp, long nextLink, @Nullable BinaryRow value) { this(partitionId, 0, commitTimestamp, nextLink, value); } /** * Constructor. */ - public RowVersion(int partitionId, long link, @Nullable HybridTimestamp timestamp, long nextLink, @Nullable ByteBuffer value) { + public RowVersion(int partitionId, long link, @Nullable HybridTimestamp timestamp, long nextLink, @Nullable BinaryRow value) { this.partitionId = partitionId; link(link); this.timestamp = timestamp; this.nextLink = nextLink; - this.valueSize = value == null ? 0 : value.limit(); + this.valueSize = value == null ? 0 : value.tupleSliceLength(); this.value = value; } @@ -111,7 +113,7 @@ public final class RowVersion implements Storable { return valueSize; } - public @Nullable ByteBuffer value() { + public @Nullable BinaryRow value() { return value; } @@ -120,17 +122,9 @@ public final class RowVersion implements Storable { } boolean isTombstone() { - return isTombstone(valueSize()); - } - - static boolean isTombstone(int valueSize) { return valueSize == 0; } - static boolean isTombstone(byte[] valueBytes) { - return isTombstone(valueBytes.length); - } - boolean isUncommitted() { return timestamp == null; } @@ -140,30 +134,28 @@ public final class RowVersion implements Storable { } @Override - public final void link(long link) { + public void link(long link) { this.link = link; } @Override - public final long link() { + public long link() { return link; } @Override - public final int partition() { + public int partition() { return partitionId; } @Override public int size() { - assert value != null; - - return headerSize() + value.limit(); + return headerSize() + valueSize; } @Override public int headerSize() { - return HYBRID_TIMESTAMP_SIZE + NEXT_LINK_STORE_SIZE_BYTES + VALUE_SIZE_STORE_SIZE_BYTES; + return HYBRID_TIMESTAMP_SIZE + NEXT_LINK_STORE_SIZE_BYTES + VALUE_SIZE_STORE_SIZE_BYTES + SCHEMA_VERSION_SIZE_BYTES; } @Override diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java index 0616b8cd1f..420aa19794 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java @@ -21,8 +21,6 @@ import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; import static org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE; import java.util.NoSuchElementException; -import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.StorageException; @@ -102,16 +100,12 @@ class ScanVersionsCursor implements Cursor<ReadResult> { if (rowVersion.isTombstone()) { return ReadResult.empty(rowId); } else { - BinaryRow row = new ByteBufferRow(rowVersion.value()); - - return ReadResult.createFromCommitted(rowId, row, rowVersion.timestamp()); + return ReadResult.createFromCommitted(rowId, rowVersion.value(), rowVersion.timestamp()); } } else { - BinaryRow row = rowVersion.isTombstone() ? null : new ByteBufferRow(rowVersion.value()); - return ReadResult.createFromWriteIntent( rowId, - row, + rowVersion.value(), versionChain.transactionId(), versionChain.commitTableId(), versionChain.commitPartitionId(), diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java index 9c5d8037d7..9f606c3d73 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo; import org.apache.ignite.internal.pagememory.io.IoVersions; import org.apache.ignite.internal.pagememory.util.PartitionlessLinks; +import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.storage.pagememory.mv.HybridTimestamps; import org.apache.ignite.internal.storage.pagememory.mv.RowVersion; import org.apache.ignite.lang.IgniteStringBuilder; @@ -50,45 +51,69 @@ public class RowVersionDataIo extends AbstractDataPageIo<RowVersion> { } @Override - protected void writeRowData(long pageAddr, int dataOff, int payloadSize, RowVersion row, boolean newRow) { + protected void writeRowData(long pageAddr, int dataOff, int payloadSize, RowVersion rowVersion, boolean newRow) { assertPageType(pageAddr); - long addr = pageAddr + dataOff; + int offset = dataOff; - putShort(addr, 0, (short) payloadSize); - addr += Short.BYTES; + putShort(pageAddr, offset, (short) payloadSize); + offset += Short.BYTES; - addr += HybridTimestamps.writeTimestampToMemory(addr, 0, row.timestamp()); + offset += HybridTimestamps.writeTimestampToMemory(pageAddr, offset, rowVersion.timestamp()); - addr += writePartitionless(addr, row.nextLink()); + offset += writePartitionless(pageAddr + offset, rowVersion.nextLink()); - putInt(addr, 0, row.valueSize()); - addr += Integer.BYTES; + putInt(pageAddr, offset, rowVersion.valueSize()); + offset += Integer.BYTES; - putByteBuffer(addr, 0, row.value()); + BinaryRow row = rowVersion.value(); + + if (row != null) { + putShort(pageAddr, offset, (short) row.schemaVersion()); + offset += Short.BYTES; + + putByteBuffer(pageAddr, offset, row.tupleSlice()); + } else { + putShort(pageAddr, offset, (short) 0); + } } @Override - protected void writeFragmentData(RowVersion row, ByteBuffer pageBuf, int rowOff, int payloadSize) { + protected void writeFragmentData(RowVersion rowVersion, ByteBuffer pageBuf, int rowOff, int payloadSize) { assertPageType(pageBuf); + int headerSize = rowVersion.headerSize(); + + BinaryRow row = rowVersion.value(); + + int bufferOffset; + int bufferSize; + if (rowOff == 0) { // first fragment - assert row.headerSize() <= payloadSize : "Header must entirely fit in the first fragment, but header size is " - + row.headerSize() + " and payload size is " + payloadSize; + assert headerSize <= payloadSize : "Header must entirely fit in the first fragment, but header size is " + + headerSize + " and payload size is " + payloadSize; + + HybridTimestamps.writeTimestampToBuffer(pageBuf, rowVersion.timestamp()); - HybridTimestamps.writeTimestampToBuffer(pageBuf, row.timestamp()); + PartitionlessLinks.writeToBuffer(pageBuf, rowVersion.nextLink()); - PartitionlessLinks.writeToBuffer(pageBuf, row.nextLink()); + pageBuf.putInt(rowVersion.valueSize()); - pageBuf.putInt(row.valueSize()); + pageBuf.putShort(row == null ? 0 : (short) row.schemaVersion()); - putValueBufferIntoPage(pageBuf, row.value(), 0, payloadSize - row.headerSize()); + bufferOffset = 0; + bufferSize = payloadSize - headerSize; } else { // non-first fragment - assert rowOff >= row.headerSize(); + assert rowOff >= headerSize; + + bufferOffset = rowOff - headerSize; + bufferSize = payloadSize; + } - putValueBufferIntoPage(pageBuf, row.value(), rowOff - row.headerSize(), payloadSize); + if (row != null) { + putValueBufferIntoPage(pageBuf, row.tupleSlice(), bufferOffset, bufferSize); } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java index 5a1b55c707..b6fe2fbf3c 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java @@ -23,7 +23,7 @@ import static org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SI import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET; -import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ROW_BYTE_ORDER; +import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.deserializeRow; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampNatural; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER; @@ -34,7 +34,6 @@ import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.TAB import java.nio.ByteBuffer; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.gc.GcEntry; import org.jetbrains.annotations.Nullable; @@ -246,7 +245,7 @@ class GarbageCollector { assert valueBytes.length > 0; // Can't be a tombstone. - var row = new ByteBufferRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER)); + var row = deserializeRow(ByteBuffer.wrap(valueBytes)); // Delete the row from the data cf. batch.delete(partCf, dataKey); diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java index 2a782efc11..9ea674b5e8 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java @@ -34,6 +34,8 @@ import java.nio.ByteOrder; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.rocksdb.RocksUtils; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowImpl; import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; @@ -251,6 +253,18 @@ public final class PartitionDataHelper implements ManuallyCloseable { return it; } + /** + * Converts an internal serialized presentation of a binary row into its Java Object counterpart. + */ + static BinaryRow deserializeRow(ByteBuffer buffer) { + assert buffer.order() == ByteOrder.BIG_ENDIAN; + + int schemaVersion = Short.toUnsignedInt(buffer.getShort()); + ByteBuffer binaryTupleSlice = buffer.slice().order(TABLE_ROW_BYTE_ORDER); + + return new BinaryRowImpl(schemaVersion, binaryTupleSlice); + } + @Override public void close() { RocksUtils.closeAll(upperBoundReadOpts, upperBound); diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java index d41024a759..922d9caa5f 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java @@ -19,19 +19,17 @@ package org.apache.ignite.internal.storage.rocksdb; import static java.lang.ThreadLocal.withInitial; import static java.nio.ByteBuffer.allocate; +import static java.nio.ByteBuffer.allocateDirect; import static java.util.Arrays.copyOf; import static java.util.Arrays.copyOfRange; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER; -import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.PARTITION_ID_OFFSET; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_PREFIX_SIZE; -import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ID_OFFSET; -import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ROW_BYTE_ORDER; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.THREAD_LOCAL_STATE; -import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TX_ID_OFFSET; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_HEADER_SIZE; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_OFFSET; +import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.deserializeRow; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.putTimestampDesc; import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampDesc; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX; @@ -43,11 +41,7 @@ import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbI import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState; import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance; import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance; -import static org.apache.ignite.internal.util.ByteUtils.bytesToLong; -import static org.apache.ignite.internal.util.ByteUtils.bytesToUuid; -import static org.apache.ignite.internal.util.ByteUtils.putUuidToBytes; -import static org.apache.ignite.internal.util.GridUnsafe.getInt; -import static org.apache.ignite.internal.util.GridUnsafe.getShort; +import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -61,7 +55,6 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter; import org.apache.ignite.internal.rocksdb.RocksUtils; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.PartitionTimestampCursor; import org.apache.ignite.internal.storage.ReadResult; @@ -72,9 +65,7 @@ import org.apache.ignite.internal.storage.TxIdMismatchException; import org.apache.ignite.internal.storage.gc.GcEntry; import org.apache.ignite.internal.storage.util.LocalLocker; import org.apache.ignite.internal.storage.util.StorageState; -import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.internal.util.Cursor; -import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.lang.IgniteStringFormatter; import org.jetbrains.annotations.Nullable; @@ -114,20 +105,22 @@ import org.rocksdb.WriteBatchWithIndex; * <p>BE means Big Endian, meaning that lexicographical bytes order matches a natural order of partitions. * * <p>DESC means that timestamps are sorted from newest to oldest (N2O). - * Please refer to {@link PartitionDataHelper#putTimestampDesc(ByteBuffer, HybridTimestamp)} - * to see how it's achieved. Missing timestamp could be interpreted as a moment infinitely far away in the future. + * Please refer to {@link PartitionDataHelper#putTimestampDesc(ByteBuffer, HybridTimestamp)} to see how it's achieved. Missing timestamp + * could be interpreted as a moment infinitely far away in the future. */ public class RocksDbMvPartitionStorage implements MvPartitionStorage { /** Thread-local on-heap byte buffer instance to use for key manipulations. */ private static final ThreadLocal<ByteBuffer> HEAP_KEY_BUFFER = withInitial(() -> allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER)); + private static final ThreadLocal<ByteBuffer> DIRECT_KEY_BUFFER = withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER)); + /** Table storage instance. */ private final RocksDbTableStorage tableStorage; /** * Partition ID (should be treated as an unsigned short). * - * <p/>Partition IDs are always stored in the big endian order, since they need to be compared lexicographically. + * <p>Partition IDs are always stored in the big endian order, since they need to be compared lexicographically. */ private final int partitionId; @@ -187,7 +180,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { helper = new PartitionDataHelper(tableId, partitionId, tableStorage.partitionCfHandle()); gc = new GarbageCollector(helper, db, tableStorage.gcQueueHandle()); - lastAppliedIndexAndTermKey = createKey(PARTITION_META_PREFIX, tableId, partitionId); lastGroupConfigKey = createKey(PARTITION_CONF_PREFIX, tableId, partitionId); @@ -331,7 +323,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { @Override public byte @Nullable [] committedGroupConfiguration() { - byte[] array = busy(() -> { + byte[] array = busy(() -> { ThreadLocalState state = THREAD_LOCAL_STATE.get(); return state == null ? lastGroupConfig : state.pendingGroupConfig; @@ -374,40 +366,43 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { assert rowIsLocked(rowId); - ByteBuffer keyBuf = prepareHeapKeyBuf(rowId); + ByteBuffer keyBuf = prepareHeapKeyBuf(rowId).rewind(); BinaryRow res = null; try { // Check concurrent transaction data. - byte[] keyBufArray = keyBuf.array(); + byte[] keyBytes = new byte[ROW_PREFIX_SIZE]; - byte[] keyBytes = copyOf(keyBufArray, ROW_PREFIX_SIZE); + keyBuf.get(keyBytes); - byte[] previousValue = writeBatch.getFromBatchAndDB(db, helper.partCf, readOpts, keyBytes); + keyBuf.rewind(); + + byte[] previousValueBytes = writeBatch.getFromBatchAndDB(db, helper.partCf, readOpts, keyBytes); // Previous value must belong to the same transaction. - if (previousValue != null) { + if (previousValueBytes != null) { + ByteBuffer previousValue = ByteBuffer.wrap(previousValueBytes); + validateTxId(previousValue, txId); res = wrapValueIntoBinaryRow(previousValue, true); } if (row == null) { + ByteBuffer value = allocate(VALUE_HEADER_SIZE); + // Write empty value as a tombstone. - if (previousValue != null) { + if (previousValueBytes != null) { // Reuse old array with transaction id already written to it. - writeBatch.put(helper.partCf, keyBytes, copyOf(previousValue, VALUE_HEADER_SIZE)); + value.put(previousValueBytes, 0, VALUE_HEADER_SIZE); } else { - byte[] valueHeaderBytes = new byte[VALUE_HEADER_SIZE]; - - putUuidToBytes(txId, valueHeaderBytes, TX_ID_OFFSET); - putInt(valueHeaderBytes, TABLE_ID_OFFSET, commitTableId); - - writeBatch.put(helper.partCf, keyBytes, valueHeaderBytes); + writeHeader(value, txId, commitTableId, commitPartitionId); } + + writeBatch.put(helper.partCf, keyBytes, value.array()); } else { - writeUnversioned(keyBufArray, row, txId, commitTableId, commitPartitionId); + writeUnversioned(keyBytes, row, txId, commitTableId, commitPartitionId); } } catch (RocksDBException e) { throw new StorageException("Failed to update a row in storage: " + createStorageInfo(), e); @@ -429,24 +424,37 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { throws RocksDBException { @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = PartitionDataHelper.requireWriteBatch(); - byte[] rowBytes = rowBytes(row); + ByteBuffer value = allocate(rowSize(row) + VALUE_HEADER_SIZE); - ByteBuffer value = allocate(rowBytes.length + VALUE_HEADER_SIZE); - byte[] array = value.array(); + writeHeader(value, txId, commitTableId, commitPartitionId); - putUuidToBytes(txId, array, TX_ID_OFFSET); - putInt(array, TABLE_ID_OFFSET, commitTableId); - putShort(array, PARTITION_ID_OFFSET, (short) commitPartitionId); - - value.position(VALUE_OFFSET).put(rowBytes); + writeBinaryRow(value, row); // Write table row data as a value. - writeBatch.put(helper.partCf, copyOf(keyArray, ROW_PREFIX_SIZE), value.array()); + writeBatch.put(helper.partCf, keyArray, value.array()); + } + + private static int rowSize(BinaryRow row) { + // Tuple + schema version. + return row.tupleSliceLength() + Short.BYTES; + } + + private static void writeHeader(ByteBuffer dest, UUID txId, int commitTableId, int commitPartitionId) { + assert dest.order() == ByteOrder.BIG_ENDIAN; + + dest + .putLong(txId.getMostSignificantBits()) + .putLong(txId.getLeastSignificantBits()) + .putInt(commitTableId) + .putShort((short) commitPartitionId); } - private static byte[] rowBytes(BinaryRow row) { - //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations. - return row.bytes(); + private static void writeBinaryRow(ByteBuffer dest, BinaryRow row) { + assert dest.order() == ByteOrder.BIG_ENDIAN; + + dest + .putShort((short) row.schemaVersion()) + .put(row.tupleSlice()); } @Override @@ -458,10 +466,16 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { assert rowIsLocked(rowId); - ByteBuffer keyBuf = prepareHeapKeyBuf(rowId); + ByteBuffer keyBuf = prepareDirectKeyBuf(rowId) + .position(0) + .limit(ROW_PREFIX_SIZE); try { - byte[] keyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE); + byte[] keyBytes = new byte[ROW_PREFIX_SIZE]; + + keyBuf.get(keyBytes); + + keyBuf.rewind(); byte[] previousValue = writeBatch.getFromBatchAndDB(db, helper.partCf, readOpts, keyBytes); @@ -471,9 +485,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { } // Perform unconditional remove for the key without associated timestamp. - writeBatch.delete(helper.partCf, keyBytes); + writeBatch.delete(helper.partCf, keyBuf); - return wrapValueIntoBinaryRow(previousValue, true); + return wrapValueIntoBinaryRow(ByteBuffer.wrap(previousValue), true); } catch (RocksDBException e) { throw new StorageException("Failed to roll back insert/update", e); } @@ -546,9 +560,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { boolean isNewValueTombstone = row == null; - //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations. - byte[] rowBytes = row != null ? rowBytes(row) : ArrayUtils.BYTE_EMPTY_ARRAY; - boolean newAndPrevTombstones; // Both this and previous values for the row id are tombstones. try { newAndPrevTombstones = gc.tryAddToGcQueue(writeBatch, rowId, commitTimestamp, isNewValueTombstone); @@ -559,8 +570,21 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { // We only write tombstone if the previous value for the same row id was not a tombstone. // So there won't be consecutive tombstones for the same row id. if (!newAndPrevTombstones) { + //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations. + byte[] rowBytes; + + if (row == null) { + rowBytes = BYTE_EMPTY_ARRAY; + } else { + ByteBuffer rowBuffer = allocate(rowSize(row)); + + writeBinaryRow(rowBuffer, row); + + rowBytes = rowBuffer.array(); + } + try { - writeBatch.put(helper.partCf, copyOf(keyBuf.array(), MAX_KEY_SIZE), rowBytes); + writeBatch.put(helper.partCf, keyBuf.array(), rowBytes); } catch (RocksDBException e) { throw new StorageException("Failed to update a row in storage: " + createStorageInfo(), e); } @@ -606,23 +630,25 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { } private ReadResult readLatestVersion(RowId rowId, RocksIterator seekIterator) { - ByteBuffer keyBuf = prepareHeapKeyBuf(rowId); + ByteBuffer keyBuf = prepareDirectKeyBuf(rowId); // Seek to the first appearance of row id if timestamp isn't set. // Since timestamps are sorted from newest to oldest, first occurrence will always be the latest version. - // Unfortunately, copy here is unavoidable with current API. assert keyBuf.position() == ROW_PREFIX_SIZE; - seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE)); + + seekIterator.seek(keyBuf.duplicate().position(0).limit(ROW_PREFIX_SIZE)); if (invalid(seekIterator)) { // No data at all. return ReadResult.empty(rowId); } - ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().rewind().limit(MAX_KEY_SIZE); + ByteBuffer readKeyBuf = DIRECT_KEY_BUFFER.get().clear(); int keyLength = seekIterator.key(readKeyBuf); + readKeyBuf.position(0).limit(keyLength); + if (!matches(rowId, readKeyBuf)) { // It is already a different row, so no version exists for our rowId. return ReadResult.empty(rowId); @@ -630,14 +656,12 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE; - byte[] valueBytes = seekIterator.value(); + ByteBuffer valueBytes = ByteBuffer.wrap(seekIterator.value()); return readResultFromKeyAndValue(isWriteIntent, readKeyBuf, valueBytes); } - private ReadResult readResultFromKeyAndValue(boolean isWriteIntent, ByteBuffer keyBuf, byte[] valueBytes) { - assert valueBytes != null; - + private ReadResult readResultFromKeyAndValue(boolean isWriteIntent, ByteBuffer keyBuf, ByteBuffer valueBytes) { RowId rowId = getRowId(keyBuf); if (!isWriteIntent) { @@ -671,7 +695,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { /** * Walks "version chain" via the iterator to find a row by timestamp. - * See {@link MvPartitionStorage#read(RowId, HybridTimestamp)} for details. + * + * <p>See {@link MvPartitionStorage#read(RowId, HybridTimestamp)} for details. * * @param seekIterator Iterator, on which seek operation was already performed. * @param rowId Row id. @@ -684,7 +709,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { // There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key. // To avoid returning its value, we have to check that actual key matches what we need. // Here we prepare direct buffer to read key without timestamp. Shared direct buffer is used to avoid extra memory allocations. - ByteBuffer foundKeyBuf = MV_KEY_BUFFER.get().rewind().limit(MAX_KEY_SIZE); + ByteBuffer foundKeyBuf = MV_KEY_BUFFER.get().clear(); int keyLength = 0; @@ -703,7 +728,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { return ReadResult.empty(rowId); } - foundKeyBuf.rewind().limit(MAX_KEY_SIZE); + foundKeyBuf.clear(); keyLength = seekIterator.key(foundKeyBuf); if (!matches(rowId, foundKeyBuf)) { @@ -711,11 +736,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { return ReadResult.empty(rowId); } - byte[] valueBytes = seekIterator.value(); - boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE; if (isWriteIntent) { + ByteBuffer valueBytes = ByteBuffer.wrap(seekIterator.value()); + // Let's check if there is a committed write. seekIterator.next(); @@ -724,7 +749,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { return wrapUncommittedValue(rowId, valueBytes, null); } - foundKeyBuf.rewind().limit(MAX_KEY_SIZE); + foundKeyBuf.clear(); seekIterator.key(foundKeyBuf); if (!matches(rowId, foundKeyBuf)) { @@ -741,7 +766,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { HybridTimestamp rowTimestamp = readTimestampDesc(foundKeyBuf); - byte[] valueBytes = seekIterator.value(); + ByteBuffer valueBytes = ByteBuffer.wrap(seekIterator.value()); if (rowTimestamp.equals(timestamp)) { // This is exactly the row we are looking for. @@ -757,7 +782,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { return wrapCommittedValue(rowId, valueBytes, rowTimestamp); } - foundKeyBuf.rewind().limit(MAX_KEY_SIZE); + foundKeyBuf.clear(); keyLength = seekIterator.key(foundKeyBuf); if (!matches(rowId, foundKeyBuf)) { @@ -768,7 +793,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE; if (isWriteIntent) { - return wrapUncommittedValue(rowId, seekIterator.value(), rowTimestamp); + return wrapUncommittedValue(rowId, ByteBuffer.wrap(seekIterator.value()), rowTimestamp); } return wrapCommittedValue(rowId, valueBytes, readTimestampDesc(foundKeyBuf)); @@ -819,7 +844,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE; - return readResultFromKeyAndValue(isWriteIntent, ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), value); + return readResultFromKeyAndValue( + isWriteIntent, + ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), + ByteBuffer.wrap(value) + ); } @Override @@ -880,7 +909,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { return busy(() -> { throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo); - ByteBuffer keyBuf = prepareHeapKeyBuf(lowerBound).rewind().limit(ROW_PREFIX_SIZE); + ByteBuffer keyBuf = prepareDirectKeyBuf(lowerBound) + .position(0) + .limit(ROW_PREFIX_SIZE); try (RocksIterator it = db.newIterator(helper.partCf, helper.scanReadOpts)) { it.seek(keyBuf); @@ -891,13 +922,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { return null; } - ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().rewind().limit(ROW_PREFIX_SIZE); + keyBuf.rewind(); - it.key(readKeyBuf); + it.key(keyBuf); - return getRowId(readKeyBuf); - } finally { - keyBuf.limit(MAX_KEY_SIZE); + return getRowId(keyBuf); } }); } @@ -1012,29 +1041,36 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { * Prepares thread-local on-heap byte buffer. Writes row id in it. Partition id is already there. Timestamp is not cleared. */ private ByteBuffer prepareHeapKeyBuf(RowId rowId) { - assert rowId.partitionId() == partitionId : rowId; + ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().clear(); - ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().rewind(); + writeKey(keyBuf, rowId); - keyBuf.putInt(tableStorage.getTableId()); - keyBuf.putShort((short) rowId.partitionId()); + return keyBuf; + } + + private ByteBuffer prepareDirectKeyBuf(RowId rowId) { + ByteBuffer keyBuf = DIRECT_KEY_BUFFER.get().clear(); - helper.putRowId(keyBuf, rowId); + writeKey(keyBuf, rowId); return keyBuf; } - private static void putShort(byte[] array, int off, short value) { - GridUnsafe.putShort(array, GridUnsafe.BYTE_ARR_OFF + off, value); - } + private void writeKey(ByteBuffer buffer, RowId rowId) { + assert buffer.order() == KEY_BYTE_ORDER; + assert rowId.partitionId() == partitionId : rowId; - private static void putInt(byte[] array, int off, int value) { - GridUnsafe.putInt(array, GridUnsafe.BYTE_ARR_OFF + off, value); + buffer.putInt(tableStorage.getTableId()); + buffer.putShort((short) rowId.partitionId()); + + helper.putRowId(buffer, rowId); } - private static void validateTxId(byte[] valueBytes, UUID txId) { - long msb = bytesToLong(valueBytes); - long lsb = bytesToLong(valueBytes, Long.BYTES); + private static void validateTxId(ByteBuffer valueBytes, UUID txId) { + long msb = valueBytes.getLong(); + long lsb = valueBytes.getLong(); + + valueBytes.rewind(); if (txId.getMostSignificantBits() != msb || txId.getLeastSignificantBits() != lsb) { throw new TxIdMismatchException(txId, new UUID(msb, lsb)); @@ -1067,37 +1103,41 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { * @param valueHasTxData Whether the value has a transaction id prefix in it. * @return Binary row instance or {@code null} if value is a tombstone. */ - private static @Nullable BinaryRow wrapValueIntoBinaryRow(byte[] valueBytes, boolean valueHasTxData) { + private static @Nullable BinaryRow wrapValueIntoBinaryRow(ByteBuffer valueBytes, boolean valueHasTxData) { if (isTombstone(valueBytes, valueHasTxData)) { return null; } - return valueHasTxData - ? new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(TABLE_ROW_BYTE_ORDER)) - : new ByteBufferRow(valueBytes); + assert valueBytes.order() == ByteOrder.BIG_ENDIAN; + + if (valueHasTxData) { + valueBytes.position(VALUE_OFFSET); + } + + return deserializeRow(valueBytes); } /** - * Converts raw byte array representation of the write-intent value into a read result adding newest commit timestamp if - * it is not {@code null}. + * Converts raw byte array representation of the write-intent value into a read result adding newest commit timestamp if it is not + * {@code null}. * * @param rowId ID of the corresponding row. - * @param valueBytes Value bytes as read from the storage. + * @param valueBuffer Value bytes as read from the storage. * @param newestCommitTs Commit timestamp of the most recent committed write of this value. * @return Read result instance. */ - private static ReadResult wrapUncommittedValue(RowId rowId, byte[] valueBytes, @Nullable HybridTimestamp newestCommitTs) { - UUID txId = bytesToUuid(valueBytes, TX_ID_OFFSET); - int commitTableId = getInt(valueBytes, GridUnsafe.BYTE_ARR_OFF + TABLE_ID_OFFSET); - int commitPartitionId = getShort(valueBytes, GridUnsafe.BYTE_ARR_OFF + PARTITION_ID_OFFSET) & 0xFFFF; + private static ReadResult wrapUncommittedValue(RowId rowId, ByteBuffer valueBuffer, @Nullable HybridTimestamp newestCommitTs) { + assert valueBuffer.order() == ByteOrder.BIG_ENDIAN; - BinaryRow row; + UUID txId = new UUID(valueBuffer.getLong(), valueBuffer.getLong()); - if (isTombstone(valueBytes, true)) { - row = null; - } else { - row = new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(TABLE_ROW_BYTE_ORDER)); - } + int commitTableId = valueBuffer.getInt(); + + int commitPartitionId = Short.toUnsignedInt(valueBuffer.getShort()); + + BinaryRow row = valueBuffer.remaining() == 0 ? null : deserializeRow(valueBuffer); + + valueBuffer.rewind(); return ReadResult.createFromWriteIntent(rowId, row, txId, commitTableId, commitPartitionId, newestCommitTs); } @@ -1110,19 +1150,23 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { * @param rowCommitTimestamp Timestamp with which the row was committed. * @return Read result instance or {@code null} if value is a tombstone. */ - private static ReadResult wrapCommittedValue(RowId rowId, byte[] valueBytes, HybridTimestamp rowCommitTimestamp) { - if (isTombstone(valueBytes, false)) { + private static ReadResult wrapCommittedValue(RowId rowId, ByteBuffer valueBytes, HybridTimestamp rowCommitTimestamp) { + if (valueBytes.remaining() == 0) { return ReadResult.empty(rowId); } - return ReadResult.createFromCommitted(rowId, new ByteBufferRow(valueBytes), rowCommitTimestamp); + return ReadResult.createFromCommitted( + rowId, + deserializeRow(valueBytes), + rowCommitTimestamp + ); } /** * Returns {@code true} if value payload represents a tombstone. */ - private static boolean isTombstone(byte[] valueBytes, boolean hasTxId) { - return valueBytes.length == (hasTxId ? VALUE_HEADER_SIZE : 0); + private static boolean isTombstone(ByteBuffer valueBytes, boolean hasTxId) { + return valueBytes.limit() == (hasTxId ? VALUE_HEADER_SIZE : 0); } private abstract class BasePartitionTimestampCursor implements PartitionTimestampCursor { @@ -1218,11 +1262,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { currentRowId = null; // Prepare direct buffer slice to read keys from the iterator. - ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().rewind(); + ByteBuffer currentKeyBuffer = DIRECT_KEY_BUFFER.get(); while (true) { - currentKeyBuffer.rewind(); - // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any // other row id in partition. When we start, row id is filled with zeroes. Value during the iteration is described later // in this code. Now let's describe what we'll find, assuming that iterator found something: @@ -1246,11 +1288,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { } // Read the actual key into a direct buffer. - int keyLength = it.key(currentKeyBuffer.limit(MAX_KEY_SIZE)); + int keyLength = it.key(currentKeyBuffer.clear()); - boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE; + currentKeyBuffer.position(0).limit(keyLength); - currentKeyBuffer.limit(ROW_PREFIX_SIZE); + boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE; RowId rowId = getRowId(currentKeyBuffer); @@ -1291,18 +1333,14 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { } } - currentKeyBuffer.limit(keyLength); - assert valueBytes != null; - ReadResult readResult; + ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes); - if (!isWriteIntent) { - // There is no write-intent, return latest committed row. - readResult = wrapCommittedValue(rowId, valueBytes, readTimestampDesc(currentKeyBuffer)); - } else { - readResult = wrapUncommittedValue(rowId, valueBytes, nextCommitTimestamp); - } + ReadResult readResult = isWriteIntent + ? wrapUncommittedValue(rowId, valueBuffer, nextCommitTimestamp) + // There is no write-intent, return latest committed row. + : wrapCommittedValue(rowId, valueBuffer, readTimestampDesc(currentKeyBuffer)); if (!readResult.isEmpty() || readResult.isWriteIntent()) { next = readResult; @@ -1336,7 +1374,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { currentRowId = null; // Prepare direct buffer slice to read keys from the iterator. - ByteBuffer directBuffer = MV_KEY_BUFFER.get().rewind(); + ByteBuffer directBuffer = DIRECT_KEY_BUFFER.get(); while (true) { //TODO IGNITE-18201 Remove copying. @@ -1347,7 +1385,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { } // We need to figure out what current row id is. - it.key(directBuffer.rewind()); + it.key(directBuffer.clear()); RowId rowId = getRowId(directBuffer); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java index 8e9f93565a..dabd1d3d2d 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java @@ -17,7 +17,8 @@ package org.apache.ignite.distributed; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -41,7 +42,6 @@ import java.util.concurrent.Flow.Subscription; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -442,14 +442,7 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest subscriberAllDataAwaitLatch.await(); - assertEquals(submittedItems.size(), retrievedItems.size()); - - List<byte[]> expItems = submittedItems.stream().map(BinaryRow::bytes).collect(Collectors.toList()); - List<byte[]> gotItems = retrievedItems.stream().map(BinaryRow::bytes).collect(Collectors.toList()); - - for (int i = 0; i < expItems.size(); i++) { - assertArrayEquals(expItems.get(i), gotItems.get(i)); - } + assertThat(submittedItems, is(retrievedItems)); if (tx != null) { tx.commit(); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java index 83a62f14d9..f3c99a37ab 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java @@ -32,7 +32,6 @@ import static org.mockito.Mockito.when; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; import java.nio.ByteBuffer; import java.nio.file.Path; -import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; @@ -228,20 +227,21 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti MvPartitionStorage partitionStorage = mvPartitionStorages.get(storageIndex); Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(partitionStorage); - RowId rowId = primaryIndex.get(req0.binaryRow().byteBuffer()); + RowId rowId = primaryIndex.get(req0.binaryRowBytes()); + BinaryRow row = partitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow(); return completedFuture(row); } // Non-null binary row if UPSERT, otherwise it's implied that request type is DELETE. - BinaryRow binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRow() : null; + ByteBuffer binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRowBytes() : null; UpdateCommand cmd = msgFactory.updateCommand() .txId(req0.transactionId()) .tablePartitionId(tablePartitionId(new TablePartitionId(1, 0))) .rowUuid(new RowId(0).uuid()) - .rowBuffer(binaryRow == null ? null : binaryRow.byteBuffer()) + .rowBuffer(binaryRow) .safeTimeLong(hybridClock.nowLong()) .build(); @@ -319,7 +319,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti return false; } - return Arrays.equals(value.bytes(), read.binaryRow().tupleSlice().array()); + return value.tupleSlice().equals(read.binaryRow().tupleSlice()); }; } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index 879b357fd2..c28c31dc3c 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -49,6 +49,7 @@ import java.util.Set; import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -62,7 +63,6 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.message.ReplicaRequest; -import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.schema.Column; @@ -196,9 +196,9 @@ public class ItColocationTest { RaftGroupService r = groupRafts.get(request.groupId()); if (request instanceof ReadWriteMultiRowReplicaRequest) { - Map<UUID, ByteBuffer> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRows() + Map<UUID, ByteBuffer> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRowsBytes() .stream() - .collect(toMap(row -> TestTransactionIds.newTransactionId(), BinaryRow::byteBuffer)); + .collect(toMap(row -> TestTransactionIds.newTransactionId(), Function.identity())); return r.run(MSG_FACTORY.updateAllCommand() .tablePartitionId(MSG_FACTORY.tablePartitionIdMessage() diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index b9d9c88c3c..84b23325d3 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -1835,7 +1835,7 @@ public class PartitionReplicaListener implements ReplicaListener { return takeLocksForInsert(searchRow, rowId0, txId) .thenCompose(rowIdLock -> applyUpdateCommand( - updateCommand(commitPartitionId, rowId0.uuid(), searchRow.byteBuffer(), txId)) + updateCommand(commitPartitionId, rowId0.uuid(), searchRow, txId)) .thenApply(ignored -> rowIdLock)) .thenApply(rowIdLock -> { // Release short term locks. @@ -1857,7 +1857,7 @@ public class PartitionReplicaListener implements ReplicaListener { return lockFut .thenCompose(rowIdLock -> applyUpdateCommand( - updateCommand(commitPartitionId, rowId0.uuid(), searchRow.byteBuffer(), txId)) + updateCommand(commitPartitionId, rowId0.uuid(), searchRow, txId)) .thenApply(ignored -> rowIdLock)) .thenApply(rowIdLock -> { // Release short term locks. @@ -1879,7 +1879,7 @@ public class PartitionReplicaListener implements ReplicaListener { return lockFut .thenCompose(rowIdLock -> applyUpdateCommand( - updateCommand(commitPartitionId, rowId0.uuid(), searchRow.byteBuffer(), txId)) + updateCommand(commitPartitionId, rowId0.uuid(), searchRow, txId)) .thenApply(ignored -> rowIdLock)) .thenApply(rowIdLock -> { // Release short term locks. @@ -1897,7 +1897,7 @@ public class PartitionReplicaListener implements ReplicaListener { return takeLocksForUpdate(searchRow, rowId, txId) .thenCompose(rowIdLock -> applyUpdateCommand( - updateCommand(commitPartitionId, rowId.uuid(), searchRow.byteBuffer(), txId)) + updateCommand(commitPartitionId, rowId.uuid(), searchRow, txId)) .thenApply(ignored -> rowIdLock)) .thenApply(rowIdLock -> { // Release short term locks. @@ -1915,7 +1915,7 @@ public class PartitionReplicaListener implements ReplicaListener { return takeLocksForUpdate(searchRow, rowId, txId) .thenCompose(rowLock -> applyUpdateCommand( - updateCommand(commitPartitionId, rowId.uuid(), searchRow.byteBuffer(), txId)) + updateCommand(commitPartitionId, rowId.uuid(), searchRow, txId)) .thenApply(ignored -> rowLock)) .thenApply(rowIdLock -> { // Release short term locks. @@ -2077,7 +2077,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return applyUpdateCommand( - updateCommand(commitPartitionId, validatedRowId.get1().uuid(), newRow.byteBuffer(), txId)) + updateCommand(commitPartitionId, validatedRowId.get1().uuid(), newRow, txId)) .thenApply(ignored -> validatedRowId) .thenApply(rowIdLock -> { // Release short term locks. @@ -2331,19 +2331,19 @@ public class PartitionReplicaListener implements ReplicaListener { * * @param tablePartId {@link TablePartitionId} object to construct {@link UpdateCommand} object with. * @param rowUuid Row UUID. - * @param rowBuf {@link ByteBuffer} representation of {@link BinaryRow}. + * @param row Row. * @param txId Transaction ID. * @return Constructed {@link UpdateCommand} object. */ - private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID rowUuid, @Nullable ByteBuffer rowBuf, UUID txId) { + private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID rowUuid, @Nullable BinaryRow row, UUID txId) { UpdateCommandBuilder bldr = MSG_FACTORY.updateCommand() .tablePartitionId(tablePartitionId(tablePartId)) .rowUuid(rowUuid) .txId(txId) .safeTimeLong(hybridClock.nowLong()); - if (rowBuf != null) { - bldr.rowBuffer(rowBuf); + if (row != null) { + bldr.rowBuffer(row.byteBuffer()); } return bldr.build(); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java index 47fae65462..f0aa965be3 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java @@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Random; import java.util.stream.IntStream; import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.DecimalNativeType; import org.apache.ignite.internal.schema.NativeType; @@ -112,7 +111,7 @@ public class ColocationHashCalculationTest { Tuple t = TableRow.tuple(r); t.set(rndCol.name(), SchemaTestUtils.generateRandomValue(rnd, rndCol.type())); - r = new Row(schema, new ByteBufferRow(marshaller.marshal(t).bytes())); + r = marshaller.marshal(t); assertEquals(colocationHash(r), r.colocationHash()); } @@ -129,7 +128,7 @@ public class ColocationHashCalculationTest { t.set(c.name(), SchemaTestUtils.generateRandomValue(rnd, c.type())); } - return new Row(schema, new ByteBufferRow(marshaller.marshal(t).bytes())); + return marshaller.marshal(t); } private int colocationHash(Row r) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/MutableRowTupleAdapterTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/MutableRowTupleAdapterTest.java index 56bb7b6f85..d99c167236 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/MutableRowTupleAdapterTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/MutableRowTupleAdapterTest.java @@ -53,7 +53,6 @@ import java.time.temporal.Temporal; import java.util.BitSet; import java.util.Random; import java.util.UUID; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.InvalidTypeException; import org.apache.ignite.internal.schema.NativeTypes; @@ -222,7 +221,7 @@ public class MutableRowTupleAdapterTest { TupleMarshaller marshaller = new TupleMarshallerImpl(new DummySchemaManagerImpl(schema)); - Row row = new Row(schema, new ByteBufferRow(marshaller.marshal(original).bytes())); + Row row = marshaller.marshal(original); Tuple key = TableRow.keyTuple(row); Tuple val = TableRow.valueTuple(row); @@ -248,7 +247,7 @@ public class MutableRowTupleAdapterTest { public void testRowTupleMutability() throws TupleMarshallerException { TupleMarshaller marshaller = new TupleMarshallerImpl(new DummySchemaManagerImpl(schema)); - Row row = new Row(schema, new ByteBufferRow(marshaller.marshal(Tuple.create().set("id", 1L).set("name", "Shirt")).bytes())); + Row row = marshaller.marshal(Tuple.create().set("id", 1L).set("name", "Shirt")); Tuple tuple = TableRow.tuple(row); Tuple key = TableRow.keyTuple(row); @@ -275,7 +274,7 @@ public class MutableRowTupleAdapterTest { public void testKeyValueTupleMutability() throws TupleMarshallerException { TupleMarshaller marshaller = new TupleMarshallerImpl(new DummySchemaManagerImpl(schema)); - Row row = new Row(schema, new ByteBufferRow(marshaller.marshal(Tuple.create().set("id", 1L).set("name", "Shirt")).bytes())); + Row row = marshaller.marshal(Tuple.create().set("id", 1L).set("name", "Shirt")); Tuple tuple = TableRow.tuple(row); Tuple key = TableRow.keyTuple(row); @@ -304,7 +303,7 @@ public class MutableRowTupleAdapterTest { public void testRowTupleSchemaAwareness() throws TupleMarshallerException { TupleMarshaller marshaller = new TupleMarshallerImpl(new DummySchemaManagerImpl(schema)); - Row row = new Row(schema, new ByteBufferRow(marshaller.marshal(Tuple.create().set("id", 1L).set("name", "Shirt")).bytes())); + Row row = marshaller.marshal(Tuple.create().set("id", 1L).set("name", "Shirt")); Tuple tuple = TableRow.tuple(row); Tuple key = TableRow.keyTuple(row); @@ -327,7 +326,7 @@ public class MutableRowTupleAdapterTest { public void testKeyValueTupleSchemaAwareness() throws TupleMarshallerException { TupleMarshaller marshaller = new TupleMarshallerImpl(new DummySchemaManagerImpl(schema)); - Row row = new Row(schema, new ByteBufferRow(marshaller.marshal(Tuple.create().set("id", 1L).set("name", "Shirt")).bytes())); + Row row = marshaller.marshal(Tuple.create().set("id", 1L).set("name", "Shirt")); Tuple tuple = TableRow.tuple(row); Tuple key = TableRow.keyTuple(row); @@ -372,7 +371,7 @@ public class MutableRowTupleAdapterTest { .set("valNumberCol", BigInteger.valueOf(rnd.nextLong())) .set("valDecimalCol", BigDecimal.valueOf(rnd.nextLong(), 5)); - Tuple rowTuple = TableRow.tuple(new Row(fullSchema, new ByteBufferRow(marshaller.marshal(tuple).bytes()))); + Tuple rowTuple = TableRow.tuple(marshaller.marshal(tuple)); assertEquals(tuple, rowTuple); @@ -406,7 +405,7 @@ public class MutableRowTupleAdapterTest { TupleMarshaller marshaller = new TupleMarshallerImpl(new DummySchemaManagerImpl(fullSchema)); - Row row = new Row(fullSchema, new ByteBufferRow(marshaller.marshal(tup1).bytes())); + Row row = marshaller.marshal(tup1); Tuple tup2 = deserializeTuple(serializeTuple(TableRow.tuple(row))); @@ -454,11 +453,11 @@ public class MutableRowTupleAdapterTest { // Check tuples backed with Row. TupleMarshaller marshaller = new TupleMarshallerImpl(new DummySchemaManagerImpl(fullSchema)); - Row row = new Row(fullSchema, new ByteBufferRow(marshaller.marshal(keyTuple, valTuple).bytes())); + Row row = marshaller.marshal(keyTuple, valTuple); Tuple rowKeyTuple = TableRow.keyTuple(row); Tuple rowValTuple = TableRow.valueTuple(row); - final Tuple rowTuple = TableRow.tuple(new Row(fullSchema, new ByteBufferRow(marshaller.marshal(tuple).bytes()))); + final Tuple rowTuple = TableRow.tuple(marshaller.marshal(tuple)); assertEquals(keyTuple, rowKeyTuple); assertEquals(rowKeyTuple, keyTuple); @@ -537,7 +536,7 @@ public class MutableRowTupleAdapterTest { TupleMarshaller marshaller = new TupleMarshallerImpl(new DummySchemaManagerImpl(fullSchema)); - Row row = new Row(fullSchema, new ByteBufferRow(marshaller.marshal(key1, val1).bytes())); + Row row = marshaller.marshal(key1, val1); Tuple key2 = deserializeTuple(serializeTuple(TableRow.keyTuple(row))); Tuple val2 = deserializeTuple(serializeTuple(TableRow.valueTuple(row))); @@ -568,7 +567,7 @@ public class MutableRowTupleAdapterTest { TupleMarshaller marshaller = new TupleMarshallerImpl(new DummySchemaManagerImpl(schemaDescriptor)); - Row row = new Row(schemaDescriptor, new ByteBufferRow(marshaller.marshal(tuple).bytes())); + Row row = marshaller.marshal(tuple); Tuple tuple1 = deserializeTuple(serializeTuple(TableRow.tuple(row))); @@ -594,14 +593,14 @@ public class MutableRowTupleAdapterTest { .set("string", "abcefghi") .set("bytes", new byte[]{1, 2, 3, 4, 5}); - assertThrowsWithCause(() -> marshaller.marshal(tuple1).bytes(), InvalidTypeException.class, "Column's type mismatch"); - assertThrowsWithCause(() -> marshaller.marshal(tuple2).bytes(), InvalidTypeException.class, "Column's type mismatch"); + assertThrowsWithCause(() -> marshaller.marshal(tuple1), InvalidTypeException.class, "Column's type mismatch"); + assertThrowsWithCause(() -> marshaller.marshal(tuple2), InvalidTypeException.class, "Column's type mismatch"); Tuple expected = Tuple.create().set("key", 1) .set("string", "abc") .set("bytes", new byte[]{1, 2, 3}); - Row row = new Row(schemaDescriptor, new ByteBufferRow(marshaller.marshal(expected).bytes())); + Row row = marshaller.marshal(expected); assertEquals(expected, deserializeTuple(serializeTuple(TableRow.tuple(row)))); } @@ -619,7 +618,7 @@ public class MutableRowTupleAdapterTest { Tuple tuple1 = Tuple.create().set("key", 1).set("decimal", new BigDecimal("123456.7")); - assertThrowsWithCause(() -> marshaller.marshal(tuple1).bytes(), SchemaMismatchException.class, + assertThrowsWithCause(() -> marshaller.marshal(tuple1), SchemaMismatchException.class, "Failed to set decimal value for column 'decimal' (max precision exceeds allocated precision)"); } @@ -637,7 +636,7 @@ public class MutableRowTupleAdapterTest { Tuple tuple = Tuple.create().set("key", 1).set("decimal", new BigDecimal("123.458")); Tuple expected = Tuple.create().set("key", 1).set("decimal", new BigDecimal("123.46")); // Rounded. - Row row = new Row(schemaDescriptor, new ByteBufferRow(marshaller.marshal(tuple).bytes())); + Row row = marshaller.marshal(tuple); assertEquals(expected, deserializeTuple(serializeTuple(TableRow.tuple(row)))); } @@ -700,7 +699,7 @@ public class MutableRowTupleAdapterTest { TupleMarshaller marshaller = new TupleMarshallerImpl(new DummySchemaManagerImpl(schema)); - return TableRow.tuple(new Row(schema, new ByteBufferRow(marshaller.marshal(original).bytes()))); + return TableRow.tuple(marshaller.marshal(original)); } catch (TupleMarshallerException e) { return fail(); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 89ea3eb4ab..e9fbfe4e29 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -29,7 +29,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -1073,7 +1072,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow(); - if (row != null && Arrays.equals(binaryRow.bytes(), row.bytes())) { + if (rowEquals(binaryRow, row)) { found = true; } } @@ -1084,7 +1083,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow(); - assertTrue(row == null || !Arrays.equals(row.bytes(), binaryRow.bytes())); + assertTrue(row == null || !rowEquals(row, binaryRow)); } } @@ -1230,7 +1229,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { if (expected == null) { assertNull(res); } else { - assertArrayEquals(expected.bytes(), res.bytes()); + assertTrue(rowEquals(expected, res)); } } @@ -1655,6 +1654,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } } + private static boolean rowEquals(BinaryRow row1, BinaryRow row2) { + return row1.schemaVersion() == row2.schemaVersion() + && row1.hasValue() == row2.hasValue() + && row1.tupleSlice().equals(row2.tupleSlice()); + } /** * Test pojo key. diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java index a658999c56..8223090ebd 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.table.type; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -273,10 +272,7 @@ public class NumericTypesSerializerTest { Row firstRow = marshaller.marshal(firstTup); Row secondRow = marshaller.marshal(secondTup); - byte[] firstRowInBytes = firstRow.bytes(); - byte[] secondRowInBytes = secondRow.bytes(); - - assertArrayEquals(firstRowInBytes, secondRowInBytes); + assertEquals(firstRow, secondRow); } @NotNull