This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new cdddfae611 IGNITE-19937 Remove BinaryRow#byteBuffer method (#2408) cdddfae611 is described below commit cdddfae611f9e69d67743dcf5d3ca068a94f7479 Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Fri Aug 4 16:07:57 2023 +0300 IGNITE-19937 Remove BinaryRow#byteBuffer method (#2408) --- .idea/codeStyles/Project.xml | 3 +- .../apache/ignite/internal/schema/BinaryRow.java | 16 ----- .../ignite/internal/schema/BinaryRowImpl.java | 11 +-- .../ignite/internal/schema/ByteBufferRow.java | 84 ---------------------- .../org/apache/ignite/internal/schema/row/Row.java | 5 -- .../sql/engine/exec/UpdatableTableImpl.java | 53 +++++++------- .../exec/rel/TableScanNodeExecutionTest.java | 3 +- .../AbstractMvPartitionStorageConcurrencyTest.java | 16 ++--- .../internal/storage/BaseMvStoragesTest.java | 5 +- .../storage/rocksdb/PartitionDataHelper.java | 6 +- .../ignite/distributed/ItTablePersistenceTest.java | 21 +++--- .../ignite/distributed/ReplicaUnavailableTest.java | 27 ++++--- .../ignite/internal/table/ItColocationTest.java | 13 ++-- .../table/distributed/StorageUpdateHandler.java | 16 ++--- .../table/distributed/TableMessageGroup.java | 6 ++ .../distributed/command/UpdateAllCommand.java | 4 +- .../table/distributed/command/UpdateCommand.java | 13 +++- .../table/distributed/raft/PartitionListener.java | 2 +- .../snapshot/incoming/IncomingSnapshotCopier.java | 7 +- .../snapshot/message/SnapshotMvDataResponse.java | 4 +- .../raft/snapshot/outgoing/OutgoingSnapshot.java | 24 ++++--- ...owReplicaRequest.java => BinaryRowMessage.java} | 24 +++---- .../request/MultipleRowReplicaRequest.java | 12 ++-- .../request/SingleRowReplicaRequest.java | 6 +- .../replication/request/SwapRowReplicaRequest.java | 10 ++- .../replicator/PartitionReplicaListener.java | 80 ++++++++++++--------- .../distributed/storage/InternalTableImpl.java | 79 ++++++++++---------- .../internal/table/distributed/IndexBaseTest.java | 32 ++++----- .../PartitionRaftCommandsSerializationTest.java | 41 ++++++----- .../raft/PartitionCommandListenerTest.java | 40 ++++++----- .../incoming/IncomingSnapshotCopierTest.java | 11 ++- .../OutgoingSnapshotMvDataStreamingTest.java | 79 +++++++++++--------- .../PartitionReplicaListenerIndexLockingTest.java | 12 +++- .../replication/PartitionReplicaListenerTest.java | 61 ++++++++-------- 34 files changed, 387 insertions(+), 439 deletions(-) diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml index 51cbc4108f..6f8eb3a91e 100644 --- a/.idea/codeStyles/Project.xml +++ b/.idea/codeStyles/Project.xml @@ -166,6 +166,7 @@ <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" /> <option name="TERNARY_OPERATION_WRAP" value="1" /> <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" /> + <option name="KEEP_SIMPLE_LAMBDAS_IN_ONE_LINE" value="true" /> <option name="FOR_STATEMENT_WRAP" value="1" /> <option name="ARRAY_INITIALIZER_WRAP" value="1" /> <option name="WRAP_COMMENTS" value="true" /> @@ -589,4 +590,4 @@ </indentOptions> </codeStyleSettings> </code_scheme> -</component> +</component> \ No newline at end of file diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java index a519a48a23..336a1abcfd 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java @@ -18,23 +18,11 @@ package org.apache.ignite.internal.schema; import java.nio.ByteBuffer; -import java.nio.ByteOrder; /** * Binary row interface. Data layout is described in packages' {@code README.md}. */ public interface BinaryRow { - ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN; - - /** Size of schema version field. */ - int SCHEMA_VERSION_FLD_LEN = Short.BYTES; - - /** Row schema version field offset. */ - int SCHEMA_VERSION_OFFSET = 0; - - /** Row binary tuple field offset. */ - int TUPLE_OFFSET = SCHEMA_VERSION_OFFSET + SCHEMA_VERSION_FLD_LEN; - /** Get row schema version. */ int schemaVersion(); @@ -46,8 +34,4 @@ public interface BinaryRow { /** Get ByteBuffer slice representing the binary tuple. */ ByteBuffer tupleSlice(); - - /** Returns the representation of this row as a Byte Buffer. */ - // TODO: remove this method, see https://issues.apache.org/jira/browse/IGNITE-19937 - ByteBuffer byteBuffer(); } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java index c7d41c452b..ee40a0a45f 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java @@ -52,16 +52,7 @@ public class BinaryRowImpl implements BinaryRow { @Override public ByteBuffer tupleSlice() { - return binaryTuple.duplicate().order(ORDER); - } - - @Override - public ByteBuffer byteBuffer() { - return ByteBuffer.allocate(tupleSliceLength() + Short.BYTES) - .order(ORDER) - .putShort((short) schemaVersion()) - .put(tupleSlice()) - .rewind(); + return binaryTuple.duplicate().order(BinaryTuple.ORDER); } @Override diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java deleted file mode 100644 index 2eb9ab6594..0000000000 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.schema; - -import static org.apache.ignite.internal.binarytuple.BinaryTupleCommon.ROW_HAS_VALUE_FLAG; - -import java.nio.ByteBuffer; - -/** - * Heap byte buffer-based row. - */ -// TODO: remove this class, see https://issues.apache.org/jira/browse/IGNITE-19937 -public class ByteBufferRow implements BinaryRow { - /** Row buffer. */ - private final ByteBuffer buf; - - /** - * Constructor. - * - * @param data Array representation of the row. - */ - public ByteBufferRow(byte[] data) { - this(ByteBuffer.wrap(data).order(ORDER)); - } - - /** - * Constructor. - * - * @param buf Buffer representing the row. - */ - public ByteBufferRow(ByteBuffer buf) { - assert buf.order() == ORDER; - assert buf.position() == 0; - - this.buf = buf; - } - - /** {@inheritDoc} */ - @Override - public int schemaVersion() { - return Short.toUnsignedInt(buf.getShort(SCHEMA_VERSION_OFFSET)); - } - - @Override - public boolean hasValue() { - return (buf.get(TUPLE_OFFSET) & ROW_HAS_VALUE_FLAG) != 0; - } - - /** {@inheritDoc} */ - @Override - public ByteBuffer tupleSlice() { - try { - return buf.position(TUPLE_OFFSET).slice().order(ORDER); - } finally { - buf.position(0); // Reset bounds. - } - } - - /** {@inheritDoc} */ - @Override - public ByteBuffer byteBuffer() { - return buf.duplicate().order(ORDER); - } - - @Override - public int tupleSliceLength() { - return buf.remaining() - TUPLE_OFFSET; - } -} diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java index e3030422af..f8562916b9 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java @@ -113,11 +113,6 @@ public class Row extends BinaryTupleReader implements BinaryRowEx, SchemaAware, return row.tupleSlice(); } - @Override - public ByteBuffer byteBuffer() { - return row.byteBuffer(); - } - @Override public int tupleSliceLength() { return row.tupleSliceLength(); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java index 310423aca4..2d15526ef8 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java @@ -23,7 +23,6 @@ import static org.apache.ignite.lang.ErrorGroups.Sql.CONSTRAINT_VIOLATION_ERR; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -47,6 +46,7 @@ import org.apache.ignite.internal.sql.engine.schema.TableDescriptor; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.sql.SqlException; @@ -78,7 +78,7 @@ public final class UpdatableTableImpl implements UpdatableTable { /** * Constructor. * - * @param desc Table descriptor. + * @param desc Table descriptor. */ public UpdatableTableImpl( int tableId, @@ -137,7 +137,7 @@ public final class UpdatableTableImpl implements UpdatableTable { ReplicaRequest request = MESSAGES_FACTORY.readWriteMultiRowReplicaRequest() .groupId(partGroupId) .commitPartitionId(commitPartitionId) - .binaryRowsBytes(serializeBinaryRows(partToRows.getValue())) + .binaryRowMessages(serializeBinaryRows(partToRows.getValue())) .transactionId(txAttributes.id()) .term(nodeWithTerm.term()) .requestType(RequestType.RW_UPSERT_ALL) @@ -150,11 +150,16 @@ public final class UpdatableTableImpl implements UpdatableTable { return CompletableFuture.allOf(futures); } - private static List<ByteBuffer> serializeBinaryRows(Collection<BinaryRow> rows) { - var result = new ArrayList<ByteBuffer>(rows.size()); + private static List<BinaryRowMessage> serializeBinaryRows(Collection<BinaryRow> rows) { + var result = new ArrayList<BinaryRowMessage>(rows.size()); for (BinaryRow row : rows) { - result.add(row.byteBuffer()); + BinaryRowMessage message = MESSAGES_FACTORY.binaryRowMessage() + .binaryTuple(row.tupleSlice()) + .schemaVersion(row.schemaVersion()) + .build(); + + result.add(message); } return result; @@ -197,7 +202,7 @@ public final class UpdatableTableImpl implements UpdatableTable { ReplicaRequest request = MESSAGES_FACTORY.readWriteMultiRowReplicaRequest() .groupId(partGroupId) .commitPartitionId(commitPartitionId) - .binaryRowsBytes(serializeBinaryRows(partToRows.getValue())) + .binaryRowMessages(serializeBinaryRows(partToRows.getValue())) .transactionId(txAttributes.id()) .term(nodeWithTerm.term()) .requestType(RequestType.RW_INSERT_ALL) @@ -205,26 +210,26 @@ public final class UpdatableTableImpl implements UpdatableTable { .build(); futures[batchNum++] = replicaService.invoke(nodeWithTerm.name(), request) - .thenApply(result -> { - Collection<BinaryRow> binaryRows = (Collection<BinaryRow>) result; + .thenApply(result -> { + Collection<BinaryRow> binaryRows = (Collection<BinaryRow>) result; - if (binaryRows.isEmpty()) { - return List.of(); - } + if (binaryRows.isEmpty()) { + return List.of(); + } - List<RowT> conflictRows = new ArrayList<>(binaryRows.size()); - IgniteTypeFactory typeFactory = ectx.getTypeFactory(); - RowHandler.RowFactory<RowT> rowFactory = handler.factory( - ectx.getTypeFactory(), - desc.insertRowType(typeFactory) - ); + List<RowT> conflictRows = new ArrayList<>(binaryRows.size()); + IgniteTypeFactory typeFactory = ectx.getTypeFactory(); + RowHandler.RowFactory<RowT> rowFactory = handler.factory( + ectx.getTypeFactory(), + desc.insertRowType(typeFactory) + ); - for (BinaryRow row : binaryRows) { - conflictRows.add(rowConverter.toRow(ectx, row, rowFactory, null)); - } + for (BinaryRow row : binaryRows) { + conflictRows.add(rowConverter.toRow(ectx, row, rowFactory, null)); + } - return conflictRows; - }); + return conflictRows; + }); } return handleInsertResults(handler, futures); @@ -260,7 +265,7 @@ public final class UpdatableTableImpl implements UpdatableTable { ReplicaRequest request = MESSAGES_FACTORY.readWriteMultiRowReplicaRequest() .groupId(partGroupId) .commitPartitionId(commitPartitionId) - .binaryRowsBytes(serializeBinaryRows(partToRows.getValue())) + .binaryRowMessages(serializeBinaryRows(partToRows.getValue())) .transactionId(txAttributes.id()) .term(nodeWithTerm.term()) .requestType(RequestType.RW_DELETE_ALL) diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java index da376fd68c..eca7bca58a 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryTuplePrefix; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; import org.apache.ignite.internal.sql.engine.exec.ScannableTableImpl; @@ -133,7 +132,7 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest { private final int dataAmount; - private final ByteBufferRow bbRow = new ByteBufferRow(new byte[1]); + private final BinaryRow bbRow = mock(BinaryRow.class); private final CopyOnWriteArraySet<Integer> partitions = new CopyOnWriteArraySet<>(); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java index 03051669e1..53c9cc25e8 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.storage; -import static java.util.stream.Collectors.toCollection; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; @@ -25,10 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.stream.Stream; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; import org.jetbrains.annotations.Nullable; @@ -203,13 +200,14 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa addAndCommit.perform(this, null); - Collection<ByteBuffer> rows = Stream.of(TABLE_ROW, TABLE_ROW2) - .map(BinaryRow::byteBuffer) - .collect(toCollection(ConcurrentLinkedQueue::new)); + Collection<BinaryRow> rows = new ConcurrentLinkedQueue<>(); + + rows.add(TABLE_ROW); + rows.add(TABLE_ROW2); runRace( - () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow().byteBuffer(), rows), - () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow().byteBuffer(), rows) + () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow(), rows), + () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow(), rows) ); assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE)); @@ -220,7 +218,7 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa } } - private void assertRemoveRow(ByteBuffer rowBytes, Collection<ByteBuffer> rows) { + private static void assertRemoveRow(@Nullable BinaryRow rowBytes, Collection<BinaryRow> rows) { assertNotNull(rowBytes); assertTrue(rows.remove(rowBytes), rowBytes.toString()); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java index 9a751332e5..b879bdfdb3 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowConverter; +import org.apache.ignite.internal.schema.BinaryRowImpl; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeTypes; @@ -77,7 +78,9 @@ public abstract class BaseMvStoragesTest { protected static BinaryRow binaryRow(TestKey key, TestValue value) { try { - return kvMarshaller.marshal(key, value); + Row row = kvMarshaller.marshal(key, value); + + return new BinaryRowImpl(row.schemaVersion(), row.tupleSlice()); } catch (MarshallerException e) { throw new IgniteException(e); } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java index 9ea674b5e8..6c24c08689 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java @@ -36,7 +36,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.rocksdb.RocksUtils; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowImpl; -import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; import org.apache.ignite.internal.storage.RowId; @@ -77,8 +77,6 @@ public final class PartitionDataHelper implements ManuallyCloseable { /** Value offset (if transaction state is present). */ static final int VALUE_OFFSET = VALUE_HEADER_SIZE; - static final ByteOrder TABLE_ROW_BYTE_ORDER = ByteBufferRow.ORDER; - /** Thread-local direct buffer instance to read keys from RocksDB. */ static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER)); @@ -260,7 +258,7 @@ public final class PartitionDataHelper implements ManuallyCloseable { assert buffer.order() == ByteOrder.BIG_ENDIAN; int schemaVersion = Short.toUnsignedInt(buffer.getShort()); - ByteBuffer binaryTupleSlice = buffer.slice().order(TABLE_ROW_BYTE_ORDER); + ByteBuffer binaryTupleSlice = buffer.slice().order(BinaryTuple.ORDER); return new BinaryRowImpl(schemaVersion, binaryTupleSlice); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java index f3c99a37ab..86ca7097e1 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java @@ -30,7 +30,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; -import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.HashMap; import java.util.LinkedList; @@ -55,6 +54,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.TestReplicationGroupId; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowImpl; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.schema.SchemaDescriptor; @@ -81,6 +81,7 @@ import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; @@ -226,8 +227,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti int storageIndex = stoppedNodeIndex == 0 ? 1 : 0; MvPartitionStorage partitionStorage = mvPartitionStorages.get(storageIndex); - Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(partitionStorage); - RowId rowId = primaryIndex.get(req0.binaryRowBytes()); + Map<BinaryRow, RowId> primaryIndex = rowsToRowIds(partitionStorage); + RowId rowId = primaryIndex.get(req0.binaryRow()); BinaryRow row = partitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow(); @@ -235,13 +236,13 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti } // Non-null binary row if UPSERT, otherwise it's implied that request type is DELETE. - ByteBuffer binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRowBytes() : null; + BinaryRowMessage binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRowMessage() : null; UpdateCommand cmd = msgFactory.updateCommand() .txId(req0.transactionId()) .tablePartitionId(tablePartitionId(new TablePartitionId(1, 0))) .rowUuid(new RowId(0).uuid()) - .rowBuffer(binaryRow) + .rowMessage(binaryRow) .safeTimeLong(hybridClock.nowLong()) .build(); @@ -303,11 +304,11 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti MvPartitionStorage storage = getListener(restarted, raftGroupId()).getMvStorage(); return () -> { - Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(storage); + Map<BinaryRow, RowId> primaryIndex = rowsToRowIds(storage); Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE; - RowId rowId = primaryIndex.get(value.byteBuffer()); + RowId rowId = primaryIndex.get(new BinaryRowImpl(value.schemaVersion(), value.tupleSlice())); if (rowId == null) { return false; @@ -323,8 +324,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti }; } - private static Map<ByteBuffer, RowId> rowsToRowIds(MvPartitionStorage storage) { - Map<ByteBuffer, RowId> result = new HashMap<>(); + private static Map<BinaryRow, RowId> rowsToRowIds(MvPartitionStorage storage) { + Map<BinaryRow, RowId> result = new HashMap<>(); RowId rowId = storage.closestRowId(RowId.lowestRowId(0)); @@ -332,7 +333,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti BinaryRow binaryRow = storage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow(); if (binaryRow != null) { - result.put(binaryRow.byteBuffer(), rowId); + result.put(binaryRow, rowId); } RowId incremented = rowId.increment(); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java index 51a589ae8d..e7dab29686 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java @@ -28,11 +28,11 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR; import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.nio.ByteBuffer; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -51,12 +51,14 @@ import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutExcepti import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.replicator.message.ReplicaResponse; +import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.row.RowAssembler; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; import org.apache.ignite.internal.testframework.IgniteAbstractTest; @@ -149,7 +151,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { .transactionId(TestTransactionIds.newTransactionId()) .commitPartitionId(tablePartitionId) .timestampLong(clock.nowLong()) - .binaryRowBytes(createKeyValueRow(1L, 1L)) + .binaryRowMessage(createKeyValueRow(1L, 1L)) .requestType(RequestType.RW_GET) .build(); @@ -191,7 +193,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { .transactionId(TestTransactionIds.newTransactionId()) .commitPartitionId(tablePartitionId) .timestampLong(clock.nowLong()) - .binaryRowBytes(createKeyValueRow(1L, 1L)) + .binaryRowMessage(createKeyValueRow(1L, 1L)) .requestType(RequestType.RW_GET) .build(); @@ -223,7 +225,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { .transactionId(TestTransactionIds.newTransactionId()) .commitPartitionId(tablePartitionId) .timestampLong(clock.nowLong()) - .binaryRowBytes(createKeyValueRow(1L, 1L)) + .binaryRowMessage(createKeyValueRow(1L, 1L)) .requestType(RequestType.RW_GET) .build(); @@ -242,10 +244,10 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { e1 = e; } - assertTrue(e0 != null); + assertNotNull(e0); assertTrue(unwrapCause(e0) instanceof ReplicationException, e0.toString()); - assertTrue(e1 != null); + assertNotNull(e1); assertTrue(unwrapCause(e1) instanceof ReplicationException, e1.toString()); } @@ -280,7 +282,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { .transactionId(TestTransactionIds.newTransactionId()) .commitPartitionId(tablePartitionId) .timestampLong(clock.nowLong()) - .binaryRowBytes(createKeyValueRow(1L, 1L)) + .binaryRowMessage(createKeyValueRow(1L, 1L)) .requestType(RequestType.RW_GET) .build(); @@ -292,17 +294,22 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { e0 = e; } - assertTrue(e0 != null); + assertNotNull(e0); assertTrue(unwrapCause(e0) instanceof ReplicationTimeoutException, e0.toString()); assertEquals(REPLICA_TIMEOUT_ERR, ((ReplicationTimeoutException) unwrapCause(e0)).code()); } - private static ByteBuffer createKeyValueRow(long id, long value) { + private BinaryRowMessage createKeyValueRow(long id, long value) { RowAssembler rowBuilder = new RowAssembler(SCHEMA); rowBuilder.appendLong(id); rowBuilder.appendLong(value); - return rowBuilder.build().byteBuffer(); + BinaryRow row = rowBuilder.build(); + + return tableMessagesFactory.binaryRowMessage() + .binaryTuple(row.tupleSlice()) + .schemaVersion(row.schemaVersion()) + .build(); } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index b412c75a3f..0f5fa3bf5d 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -35,7 +35,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.math.BigDecimal; import java.math.BigInteger; -import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -64,7 +63,6 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.schema.BinaryRowEx; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeType; import org.apache.ignite.internal.schema.NativeTypeSpec; @@ -78,6 +76,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; @@ -197,7 +196,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest { RaftGroupService r = groupRafts.get(request.groupId()); if (request instanceof ReadWriteMultiRowReplicaRequest) { - Map<UUID, ByteBuffer> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRowsBytes() + Map<UUID, BinaryRowMessage> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRowMessages() .stream() .collect(toMap(row -> TestTransactionIds.newTransactionId(), Function.identity())); @@ -221,7 +220,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest { .build() ) .rowUuid(UUID.randomUUID()) - .rowBuffer(((ReadWriteSingleRowReplicaRequest) request).binaryRow().byteBuffer()) + .rowMessage(((ReadWriteSingleRowReplicaRequest) request).binaryRowMessage()) .txId(TestTransactionIds.newTransactionId()) .build()); } @@ -394,14 +393,14 @@ public class ItColocationTest extends BaseIgniteAbstractTest { partsMap.merge(part, 1, (cnt, ignore) -> ++cnt); } - assertEquals(partsMap.size(), CMDS_MAP.size()); + assertEquals(CMDS_MAP.size(), partsMap.size()); CMDS_MAP.forEach((p, set) -> { UpdateAllCommand cmd = (UpdateAllCommand) CollectionUtils.first(set); assertEquals(partsMap.get(p), cmd.rowsToUpdate().size(), () -> "part=" + p + ", set=" + set); - cmd.rowsToUpdate().values().forEach(byteBuffer -> { - Row r = new Row(schema, new ByteBufferRow(byteBuffer)); + cmd.rowsToUpdate().values().forEach(rowMessage -> { + Row r = new Row(schema, rowMessage.asBinaryRow()); assertEquals(INT_TABLE.partition(r), p); }); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java index 110be7852c..146ff6620a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.table.distributed; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -32,13 +31,13 @@ import java.util.function.Consumer; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.schema.configuration.GcConfiguration; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; @@ -102,20 +101,19 @@ public class StorageUpdateHandler { * @param txId Transaction id. * @param rowUuid Row UUID. * @param commitPartitionId Commit partition id. - * @param rowBuffer Row buffer. + * @param row Row. * @param onApplication Callback on application. */ public void handleUpdate( UUID txId, UUID rowUuid, TablePartitionId commitPartitionId, - @Nullable ByteBuffer rowBuffer, + @Nullable BinaryRow row, @Nullable Consumer<RowId> onApplication ) { indexUpdateHandler.waitIndexes(); storage.runConsistently(locker -> { - BinaryRow row = rowBuffer != null ? new ByteBufferRow(rowBuffer) : null; RowId rowId = new RowId(partitionId, rowUuid); int commitTblId = commitPartitionId.tableId(); int commitPartId = commitPartitionId.partitionId(); @@ -151,7 +149,7 @@ public class StorageUpdateHandler { */ public void handleUpdateAll( UUID txId, - Map<UUID, ByteBuffer> rowsToUpdate, + Map<UUID, BinaryRowMessage> rowsToUpdate, TablePartitionId commitPartitionId, @Nullable Consumer<Collection<RowId>> onReplication ) { @@ -165,11 +163,11 @@ public class StorageUpdateHandler { List<RowId> rowIds = new ArrayList<>(); // Sort IDs to prevent deadlock. Natural UUID order matches RowId order within the same partition. - SortedMap<UUID, ByteBuffer> sortedRowsToUpdateMap = new TreeMap<>(rowsToUpdate); + SortedMap<UUID, BinaryRowMessage> sortedRowsToUpdateMap = new TreeMap<>(rowsToUpdate); - for (Map.Entry<UUID, ByteBuffer> entry : sortedRowsToUpdateMap.entrySet()) { + for (Map.Entry<UUID, BinaryRowMessage> entry : sortedRowsToUpdateMap.entrySet()) { RowId rowId = new RowId(partitionId, entry.getKey()); - BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue()) : null; + BinaryRow row = entry.getValue() == null ? null : entry.getValue().asBinaryRow(); locker.lock(rowId); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java index 4ccb9ab6e9..667a58fde0 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest; @@ -143,6 +144,11 @@ public interface TableMessageGroup { */ short BINARY_TUPLE = 17; + /** + * Message type for {@link BinaryRowMessage}. + */ + short BINARY_ROW_MESSAGE = 18; + /** * Message types for Table module RAFT commands. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java index 9654deff74..4755ac0502 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java @@ -17,10 +17,10 @@ package org.apache.ignite.internal.table.distributed.command; -import java.nio.ByteBuffer; import java.util.Map; import java.util.UUID; import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.network.annotations.Transferable; /** @@ -30,5 +30,5 @@ import org.apache.ignite.network.annotations.Transferable; public interface UpdateAllCommand extends PartitionCommand { TablePartitionIdMessage tablePartitionId(); - Map<UUID, ByteBuffer> rowsToUpdate(); + Map<UUID, BinaryRowMessage> rowsToUpdate(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java index 358432d1d7..0515214a55 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java @@ -17,9 +17,10 @@ package org.apache.ignite.internal.table.distributed.command; -import java.nio.ByteBuffer; import java.util.UUID; +import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.network.annotations.Transferable; import org.jetbrains.annotations.Nullable; @@ -33,5 +34,13 @@ public interface UpdateCommand extends PartitionCommand { UUID rowUuid(); @Nullable - ByteBuffer rowBuffer(); + BinaryRowMessage rowMessage(); + + /** Returns the row to update or {@code null} if the row should be removed. */ + @Nullable + default BinaryRow row() { + BinaryRowMessage message = rowMessage(); + + return message == null ? null : message.asBinaryRow(); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index b3746397c0..41c1ee0d3c 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -231,7 +231,7 @@ public class PartitionListener implements RaftGroupListener { return; } - storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), cmd.tablePartitionId().asTablePartitionId(), cmd.rowBuffer(), + storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), cmd.tablePartitionId().asTablePartitionId(), cmd.row(), rowId -> { txsPendingRowIds.computeIfAbsent(cmd.txId(), entry -> new TreeSet<>()).add(rowId); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java index 0dacd347ac..fd5a9a3c37 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java @@ -21,7 +21,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; -import java.nio.ByteBuffer; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -31,7 +30,6 @@ import java.util.function.Function; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; @@ -46,6 +44,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.network.ClusterNode; @@ -393,9 +392,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier { private void writeVersion(ResponseEntry entry, int i) { RowId rowId = new RowId(partId(), entry.rowId()); - ByteBuffer rowVersion = entry.rowVersions().get(i); + BinaryRowMessage rowVersion = entry.rowVersions().get(i); - BinaryRow binaryRow = rowVersion == null ? null : new ByteBufferRow(rowVersion.rewind()); + BinaryRow binaryRow = rowVersion == null ? null : rowVersion.asBinaryRow(); PartitionAccess partition = partitionSnapshotStorage.partition(); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java index 523b0f4834..ca115fdb24 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java @@ -17,12 +17,12 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.message; -import java.nio.ByteBuffer; import java.util.List; import java.util.UUID; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.table.TableRow; import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; import org.jetbrains.annotations.Nullable; @@ -48,7 +48,7 @@ public interface SnapshotMvDataResponse extends NetworkMessage { UUID rowId(); /** List of {@link TableRow}s for a given {@link #rowId()}. */ - List<ByteBuffer> rowVersions(); + List<BinaryRowMessage> rowVersions(); /** * List of commit timestamps for all committed versions. Might be smaller than {@link #rowVersions()} if there's a write-intent diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java index 5e947e32dc..82544f9f06 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; -import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; @@ -42,6 +41,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.lang.IgniteBiTuple; @@ -264,12 +264,13 @@ public class OutgoingSnapshot { return totalBytesAfter; } - private static long rowSizeInBytes(List<ByteBuffer> rowVersions) { + private static long rowSizeInBytes(List<BinaryRowMessage> rowVersions) { long sum = 0; - for (ByteBuffer buf : rowVersions) { - if (buf != null) { - sum += buf.remaining(); + for (BinaryRowMessage rowMessage : rowVersions) { + if (rowMessage != null) { + // Schema version is an unsigned short. + sum += rowMessage.binaryTuple().remaining() + Short.BYTES; } } @@ -318,7 +319,7 @@ public class OutgoingSnapshot { } int count = rowVersionsN2O.size(); - List<ByteBuffer> buffers = new ArrayList<>(count); + List<BinaryRowMessage> rowVersions = new ArrayList<>(count); int commitTimestampsCount = rowVersionsN2O.get(0).isWriteIntent() ? count - 1 : count; long[] commitTimestamps = new long[commitTimestampsCount]; @@ -331,7 +332,14 @@ public class OutgoingSnapshot { ReadResult version = rowVersionsN2O.get(i); BinaryRow row = version.binaryRow(); - buffers.add(row == null ? null : row.byteBuffer()); + BinaryRowMessage rowMessage = row == null + ? null + : MESSAGES_FACTORY.binaryRowMessage() + .binaryTuple(row.tupleSlice()) + .schemaVersion(row.schemaVersion()) + .build(); + + rowVersions.add(rowMessage); if (version.isWriteIntent()) { assert i == 0 : rowVersionsN2O; @@ -346,7 +354,7 @@ public class OutgoingSnapshot { return MESSAGES_FACTORY.responseEntry() .rowId(rowId.uuid()) - .rowVersions(buffers) + .rowVersions(rowVersions) .timestamps(commitTimestamps) .txId(transactionId) .commitTableId(commitTableId) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryRowMessage.java similarity index 63% copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryRowMessage.java index 98a4656483..3bb8cd8057 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryRowMessage.java @@ -18,22 +18,22 @@ package org.apache.ignite.internal.table.distributed.replication.request; import java.nio.ByteBuffer; -import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; -import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; -import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.internal.schema.BinaryRowImpl; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.annotations.Transferable; /** - * Single-row replica request. + * Message for transferring a {@link BinaryRow}. */ -public interface SingleRowReplicaRequest extends ReplicaRequest { - ByteBuffer binaryRowBytes(); +@Transferable(TableMessageGroup.BINARY_ROW_MESSAGE) +public interface BinaryRowMessage extends NetworkMessage { + ByteBuffer binaryTuple(); - default BinaryRow binaryRow() { - return new ByteBufferRow(binaryRowBytes()); - } + int schemaVersion(); - @Marshallable - RequestType requestType(); + default BinaryRow asBinaryRow() { + return new BinaryRowImpl(schemaVersion(), binaryTuple()); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java index 5b7c39b720..415f73253f 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java @@ -17,12 +17,10 @@ package org.apache.ignite.internal.table.distributed.replication.request; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; import org.apache.ignite.network.annotations.Marshallable; @@ -30,18 +28,18 @@ import org.apache.ignite.network.annotations.Marshallable; * Multiple row replica request. */ public interface MultipleRowReplicaRequest extends ReplicaRequest { - Collection<ByteBuffer> binaryRowsBytes(); + Collection<BinaryRowMessage> binaryRowMessages(); /** * Deserializes binary row byte buffers into binary rows. */ default Collection<BinaryRow> binaryRows() { - Collection<ByteBuffer> binaryRowsBytes = binaryRowsBytes(); + Collection<BinaryRowMessage> binaryRowMessages = binaryRowMessages(); - var result = new ArrayList<BinaryRow>(binaryRowsBytes.size()); + var result = new ArrayList<BinaryRow>(binaryRowMessages.size()); - for (ByteBuffer buffer : binaryRowsBytes) { - result.add(new ByteBufferRow(buffer)); + for (BinaryRowMessage message : binaryRowMessages) { + result.add(message.asBinaryRow()); } return result; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java index 98a4656483..a2f033cf0e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java @@ -17,10 +17,8 @@ package org.apache.ignite.internal.table.distributed.replication.request; -import java.nio.ByteBuffer; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; import org.apache.ignite.network.annotations.Marshallable; @@ -28,10 +26,10 @@ import org.apache.ignite.network.annotations.Marshallable; * Single-row replica request. */ public interface SingleRowReplicaRequest extends ReplicaRequest { - ByteBuffer binaryRowBytes(); + BinaryRowMessage binaryRowMessage(); default BinaryRow binaryRow() { - return new ByteBufferRow(binaryRowBytes()); + return binaryRowMessage().asBinaryRow(); } @Marshallable diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java index d088c702d3..1b5a82f6ec 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java @@ -17,10 +17,8 @@ package org.apache.ignite.internal.table.distributed.replication.request; -import java.nio.ByteBuffer; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; import org.apache.ignite.network.annotations.Marshallable; @@ -28,16 +26,16 @@ import org.apache.ignite.network.annotations.Marshallable; * Dual row replica request. */ public interface SwapRowReplicaRequest extends ReplicaRequest { - ByteBuffer binaryRowBytes(); + BinaryRowMessage binaryRowMessage(); default BinaryRow binaryRow() { - return new ByteBufferRow(binaryRowBytes()); + return binaryRowMessage().asBinaryRow(); } - ByteBuffer oldBinaryRowBytes(); + BinaryRowMessage oldBinaryRowMessage(); default BinaryRow oldBinaryRow() { - return new ByteBufferRow(oldBinaryRowBytes()); + return oldBinaryRowMessage().asBinaryRow(); } @Marshallable diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 5f44953212..1979ef7730 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -112,6 +112,7 @@ import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder; import org.apache.ignite.internal.table.distributed.index.IndexBuilder; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest; @@ -141,6 +142,7 @@ import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.CursorUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.lang.ErrorGroups.Replicator; @@ -924,13 +926,13 @@ public class PartitionReplicaListener implements ReplicaListener { return committedReadResult.binaryRow(); }) - .thenComposeAsync(resolvedReadResult -> { - if (resolvedReadResult != null && indexRowMatches(indexRow, resolvedReadResult, schemaAwareIndexStorage)) { - result.add(resolvedReadResult); - } + .thenComposeAsync(resolvedReadResult -> { + if (resolvedReadResult != null && indexRowMatches(indexRow, resolvedReadResult, schemaAwareIndexStorage)) { + result.add(resolvedReadResult); + } - return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchSize, result); - }, scanRequestExecutor); + return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchSize, result); + }, scanRequestExecutor); } /** @@ -1404,9 +1406,9 @@ public class PartitionReplicaListener implements ReplicaListener { checkWriteIntentsBelongSameTx(writeIntents); return resolveTxState( - new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()), - writeIntent.transactionId(), - ts) + new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()), + writeIntent.transactionId(), + ts) .thenApply(readLastCommitted -> { if (readLastCommitted) { for (ReadResult wi : writeIntents) { @@ -1468,7 +1470,7 @@ public class PartitionReplicaListener implements ReplicaListener { /** * Tests row values for equality. * - * @param row Row. + * @param row Row. * @param row2 Row. * @return {@code true} if rows are equal. */ @@ -1533,7 +1535,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOf(rowIdLockFuts).thenCompose(ignore -> { - Map<UUID, ByteBuffer> rowIdsToDelete = new HashMap<>(); + Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>(); Collection<BinaryRow> result = new ArrayList<>(); int futNum = 0; @@ -1572,7 +1574,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOf(deleteExactLockFuts).thenCompose(ignore -> { - Map<UUID, ByteBuffer> rowIdsToDelete = new HashMap<>(); + Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>(); Collection<BinaryRow> result = new ArrayList<>(); int futNum = 0; @@ -1642,10 +1644,14 @@ public class PartitionReplicaListener implements ReplicaListener { insertLockFuts[idx++] = takeLocksForInsert(entry.getValue(), entry.getKey(), txId); } - Map<UUID, ByteBuffer> convertedMap = rowsToInsert.entrySet().stream().collect( - Collectors.toMap( + Map<UUID, BinaryRowMessage> convertedMap = rowsToInsert.entrySet().stream() + .collect(Collectors.toMap( e -> e.getKey().uuid(), - e -> e.getValue().byteBuffer())); + e -> MSG_FACTORY.binaryRowMessage() + .binaryTuple(e.getValue().tupleSlice()) + .schemaVersion(e.getValue().schemaVersion()) + .build() + )); return allOf(insertLockFuts) .thenCompose(ignored -> applyUpdateAllCommand( @@ -1679,14 +1685,14 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOf(rowIdFuts).thenCompose(ignore -> { - Map<UUID, ByteBuffer> rowsToUpdate = new HashMap<>(); + Map<UUID, BinaryRowMessage> rowsToUpdate = IgniteUtils.newHashMap(request.binaryRowMessages().size()); int futNum = 0; - for (BinaryRow row : request.binaryRows()) { + for (BinaryRowMessage row : request.binaryRowMessages()) { RowId lockedRow = rowIdFuts[futNum++].join().get1(); - rowsToUpdate.put(lockedRow.uuid(), row.byteBuffer()); + rowsToUpdate.put(lockedRow.uuid(), row); } if (rowsToUpdate.isEmpty()) { @@ -1747,7 +1753,7 @@ public class PartitionReplicaListener implements ReplicaListener { cmd.txId(), cmd.rowUuid(), cmd.tablePartitionId().asTablePartitionId(), - cmd.rowBuffer(), + cmd.row(), rowId -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> { if (v == null) { v = new TreeSet<>(); @@ -1963,7 +1969,7 @@ public class PartitionReplicaListener implements ReplicaListener { /** * Takes all required locks on a key, before upserting. * - * @param txId Transaction id. + * @param txId Transaction id. * @return Future completes with tuple {@link RowId} and collection of {@link Lock}. */ private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForUpdate(BinaryRow binaryRow, RowId rowId, UUID txId) { @@ -2035,7 +2041,7 @@ public class PartitionReplicaListener implements ReplicaListener { /** * Takes all required locks on a key, before deleting the value. * - * @param txId Transaction id. + * @param txId Transaction id. * @return Future completes with {@link RowId} or {@code null} if there is no value for remove. */ private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) { @@ -2055,7 +2061,7 @@ public class PartitionReplicaListener implements ReplicaListener { /** * Takes all required locks on a key, before deleting the value. * - * @param txId Transaction id. + * @param txId Transaction id. * @return Future completes with {@link RowId} or {@code null} if there is no value for the key. */ private CompletableFuture<RowId> takeLocksForDelete(BinaryRow binaryRow, RowId rowId, UUID txId) { @@ -2068,7 +2074,7 @@ public class PartitionReplicaListener implements ReplicaListener { /** * Takes all required locks on a key, before getting the value. * - * @param txId Transaction id. + * @param txId Transaction id. * @return Future completes with {@link RowId} or {@code null} if there is no value for the key. */ private CompletableFuture<RowId> takeLocksForGet(RowId rowId, UUID txId) { @@ -2124,7 +2130,7 @@ public class PartitionReplicaListener implements ReplicaListener { /** * Takes all required locks on a key, before updating the value. * - * @param txId Transaction id. + * @param txId Transaction id. * @return Future completes with tuple {@link RowId} and collection of {@link Lock} or {@code null} if there is no suitable row. */ private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForReplace(BinaryRow expectedRow, BinaryRow oldRow, @@ -2284,8 +2290,7 @@ public class PartitionReplicaListener implements ReplicaListener { } /** - * Resolves a read result to the matched row. - * If the result does not match any row, the method returns a future to {@code null}. + * Resolves a read result to the matched row. If the result does not match any row, the method returns a future to {@code null}. * * @param readResult Read result. * @param timestamp Timestamp. @@ -2298,9 +2303,9 @@ public class PartitionReplicaListener implements ReplicaListener { Supplier<BinaryRow> lastCommitted ) { return resolveTxState( - new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()), - readResult.transactionId(), - timestamp) + new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()), + readResult.transactionId(), + timestamp) .thenApply(readLastCommitted -> { if (readLastCommitted) { return lastCommitted.get(); @@ -2371,7 +2376,12 @@ public class PartitionReplicaListener implements ReplicaListener { .safeTimeLong(hybridClock.nowLong()); if (row != null) { - bldr.rowBuffer(row.byteBuffer()); + BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage() + .binaryTuple(row.tupleSlice()) + .schemaVersion(row.schemaVersion()) + .build(); + + bldr.rowMessage(rowMessage); } return bldr.build(); @@ -2385,7 +2395,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param txId Transaction ID. * @return Constructed {@link UpdateAllCommand} object. */ - private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, Map<UUID, ByteBuffer> rowsToUpdate, UUID txId) { + private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, Map<UUID, BinaryRowMessage> rowsToUpdate, UUID txId) { return MSG_FACTORY.updateAllCommand() .tablePartitionId(tablePartitionId(tablePartId)) .rowsToUpdate(rowsToUpdate) @@ -2418,8 +2428,8 @@ public class PartitionReplicaListener implements ReplicaListener { } /** - * Class that stores a list of futures for operations that has happened in a specific transaction. - * Also, the class has a property {@code state} that represents a transaction state. + * Class that stores a list of futures for operations that has happened in a specific transaction. Also, the class has a property + * {@code state} that represents a transaction state. */ private static class TxCleanupReadyFutureList { /** @@ -2428,8 +2438,8 @@ public class PartitionReplicaListener implements ReplicaListener { final Map<RequestType, List<CompletableFuture<?>>> futures = new EnumMap<>(RequestType.class); /** - * Transaction state. {@code TxState#ABORTED} and {@code TxState#COMMITED} match the final transaction states. - * If the property is {@code null} the transaction is in pending state. + * Transaction state. {@code TxState#ABORTED} and {@code TxState#COMMITED} match the final transaction states. If the property is + * {@code null} the transaction is in pending state. */ TxState state; } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 94aa2eadd5..ce3b671430 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -32,7 +32,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.net.ConnectException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -74,7 +73,9 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage; +import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder; @@ -237,9 +238,9 @@ public class InternalTableImpl implements InternalTable { ); } - final boolean implicit = tx == null; + boolean implicit = tx == null; - final InternalTransaction tx0 = implicit ? txManager.begin() : tx; + InternalTransaction tx0 = implicit ? txManager.begin() : tx; int partId = partitionId(row); @@ -300,7 +301,7 @@ public class InternalTableImpl implements InternalTable { ); } - final boolean implicit = tx == null; + boolean implicit = tx == null; // It's possible to have null txState if transaction isn't started yet. if (!implicit && !(tx.state() == TxState.PENDING || tx.state() == null)) { @@ -308,7 +309,7 @@ public class InternalTableImpl implements InternalTable { "The operation is attempted for completed transaction")); } - final InternalTransaction tx0 = implicit ? txManager.begin() : tx; + InternalTransaction tx0 = implicit ? txManager.begin() : tx; Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(keyRows); @@ -532,7 +533,7 @@ public class InternalTableImpl implements InternalTable { tx, (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest() .groupId(groupId) - .binaryRowBytes(keyRow.byteBuffer()) + .binaryRowMessage(serializeBinaryRow(keyRow)) .commitPartitionId(commitPart) .transactionId(txo.id()) .term(term) @@ -554,7 +555,7 @@ public class InternalTableImpl implements InternalTable { return replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlySingleRowReplicaRequest() .groupId(partGroupId) - .binaryRowBytes(keyRow.byteBuffer()) + .binaryRowMessage(serializeBinaryRow(keyRow)) .requestType(RequestType.RO_GET) .readTimestampLong(readTimestamp.longValue()) .build() @@ -579,7 +580,7 @@ public class InternalTableImpl implements InternalTable { tx, (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest() .groupId(groupId) - .binaryRowsBytes(serializeBinaryRows(keyRows0)) + .binaryRowMessages(serializeBinaryRows(keyRows0)) .commitPartitionId(commitPart) .transactionId(txo.id()) .term(term) @@ -603,30 +604,36 @@ public class InternalTableImpl implements InternalTable { for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) { ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partitionRowBatch.getIntKey()).groupId(); - CompletableFuture<Object> fut = replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlyMultiRowReplicaRequest() + ReadOnlyMultiRowReplicaRequest request = tableMessagesFactory.readOnlyMultiRowReplicaRequest() .groupId(partGroupId) - .binaryRowsBytes(serializeBinaryRows(partitionRowBatch.getValue().requestedRows)) + .binaryRowMessages(serializeBinaryRows(partitionRowBatch.getValue().requestedRows)) .requestType(RequestType.RO_GET_ALL) .readTimestampLong(readTimestamp.longValue()) - .build() - ); + .build(); - partitionRowBatch.getValue().resultFuture = fut; + partitionRowBatch.getValue().resultFuture = replicaSvc.invoke(recipientNode, request); } return collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values()); } - private static List<ByteBuffer> serializeBinaryRows(Collection<? extends BinaryRow> rows) { - var result = new ArrayList<ByteBuffer>(rows.size()); + private List<BinaryRowMessage> serializeBinaryRows(Collection<? extends BinaryRow> rows) { + var result = new ArrayList<BinaryRowMessage>(rows.size()); for (BinaryRow row : rows) { - result.add(row.byteBuffer()); + result.add(serializeBinaryRow(row)); } return result; } + private BinaryRowMessage serializeBinaryRow(BinaryRow row) { + return tableMessagesFactory.binaryRowMessage() + .binaryTuple(row.tupleSlice()) + .schemaVersion(row.schemaVersion()) + .build(); + } + /** {@inheritDoc} */ @Override public CompletableFuture<Void> upsert(BinaryRowEx row, InternalTransaction tx) { @@ -636,7 +643,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowBytes(row.byteBuffer()) + .binaryRowMessage(serializeBinaryRow(row)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_UPSERT) @@ -680,7 +687,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowBytes(row.byteBuffer()) + .binaryRowMessage(serializeBinaryRow(row)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_GET_AND_UPSERT) @@ -698,7 +705,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowBytes(row.byteBuffer()) + .binaryRowMessage(serializeBinaryRow(row)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_INSERT) @@ -716,7 +723,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowsBytes(serializeBinaryRows(keyRows0)) + .binaryRowMessages(serializeBinaryRows(keyRows0)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_INSERT_ALL) @@ -735,7 +742,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowBytes(row.byteBuffer()) + .binaryRowMessage(serializeBinaryRow(row)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_REPLACE_IF_EXIST) @@ -753,8 +760,8 @@ public class InternalTableImpl implements InternalTable { (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSwapRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .oldBinaryRowBytes(oldRow.byteBuffer()) - .binaryRowBytes(newRow.byteBuffer()) + .oldBinaryRowMessage(serializeBinaryRow(oldRow)) + .binaryRowMessage(serializeBinaryRow(newRow)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_REPLACE) @@ -772,7 +779,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowBytes(row.byteBuffer()) + .binaryRowMessage(serializeBinaryRow(row)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_GET_AND_REPLACE) @@ -790,7 +797,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowBytes(keyRow.byteBuffer()) + .binaryRowMessage(serializeBinaryRow(keyRow)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_DELETE) @@ -808,7 +815,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowBytes(oldRow.byteBuffer()) + .binaryRowMessage(serializeBinaryRow(oldRow)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_DELETE_EXACT) @@ -826,7 +833,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowBytes(row.byteBuffer()) + .binaryRowMessage(serializeBinaryRow(row)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_GET_AND_DELETE) @@ -844,7 +851,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowsBytes(serializeBinaryRows(keyRows0)) + .binaryRowMessages(serializeBinaryRows(keyRows0)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_DELETE_ALL) @@ -866,7 +873,7 @@ public class InternalTableImpl implements InternalTable { (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowsBytes(serializeBinaryRows(keyRows0)) + .binaryRowMessages(serializeBinaryRows(keyRows0)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_DELETE_EXACT_ALL) @@ -982,18 +989,16 @@ public class InternalTableImpl implements InternalTable { // won't be rolled back automatically - it's up to the user or outer engine. if (tx != null && tx.isReadOnly()) { throw new TransactionException( - new TransactionException( - TX_FAILED_READ_WRITE_OPERATION_ERR, - "Failed to enlist read-write operation into read-only transaction txId={" + tx.id() + '}' - ) + TX_FAILED_READ_WRITE_OPERATION_ERR, + "Failed to enlist read-write operation into read-only transaction txId={" + tx.id() + '}' ); } validatePartitionIndex(partId); - final boolean implicit = tx == null; + boolean implicit = tx == null; - final InternalTransaction tx0 = implicit ? txManager.begin() : tx; + InternalTransaction tx0 = implicit ? txManager.begin() : tx; return new PartitionScanPublisher( (scanId, batchSize) -> enlistCursorInTx( @@ -1346,7 +1351,7 @@ public class InternalTableImpl implements InternalTable { Function<CompletableFuture<Void>, CompletableFuture<Void>> onClose; /** True when the publisher has a subscriber, false otherwise. */ - private AtomicBoolean subscribed; + private final AtomicBoolean subscribed; /** * The constructor. @@ -1623,7 +1628,7 @@ public class InternalTableImpl implements InternalTable { return tableMessagesFactory.readWriteMultiRowReplicaRequest() .groupId(groupId) .commitPartitionId(commitPart) - .binaryRowsBytes(serializeBinaryRows(keyRows0)) + .binaryRowMessages(serializeBinaryRows(keyRows0)) .transactionId(txo.id()) .term(term) .requestType(RequestType.RW_UPSERT_ALL) diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java index e5eb1e768b..7307f934e4 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage; import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage; import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.PendingComparableValuesTracker; @@ -55,13 +56,15 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; /** - * Base test for indexes. Sets up a table with (int, string) key and (int, string) value and - * three indexes: primary key, hash index over value columns and sorted index over value columns. + * Base test for indexes. Sets up a table with (int, string) key and (int, string) value and three indexes: primary key, hash index over + * value columns and sorted index over value columns. */ @ExtendWith(ConfigurationExtension.class) public abstract class IndexBaseTest extends BaseMvStoragesTest { protected static final int PARTITION_ID = 0; + private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); + private static final BinaryTupleSchema TUPLE_SCHEMA = BinaryTupleSchema.createRowSchema(schemaDescriptor); private static final BinaryTupleSchema PK_INDEX_SCHEMA = BinaryTupleSchema.createKeySchema(schemaDescriptor); @@ -160,13 +163,7 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row) { TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); - handler.handleUpdate( - TX_ID, - rowUuid, - partitionId, - row == null ? null : row.byteBuffer(), - (unused) -> {} - ); + handler.handleUpdate(TX_ID, rowUuid, partitionId, row, (unused) -> {}); } static BinaryRow defaultRow() { @@ -226,22 +223,23 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { USE_UPDATE { @Override void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row) { - handler.handleUpdate( - TX_ID, - rowUuid, - partitionId, - row == null ? null : row.byteBuffer(), - (unused) -> {} - ); + handler.handleUpdate(TX_ID, rowUuid, partitionId, row, (unused) -> {}); } }, /** Uses updateAll api. */ USE_UPDATE_ALL { @Override void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row) { + BinaryRowMessage rowMessage = row == null + ? null + : MSG_FACTORY.binaryRowMessage() + .binaryTuple(row.tupleSlice()) + .schemaVersion(row.schemaVersion()) + .build(); + handler.handleUpdateAll( TX_ID, - singletonMap(rowUuid, row == null ? null : row.byteBuffer()), + singletonMap(rowUuid, rowMessage), partitionId, (unused) -> {} ); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java index 60d728e269..c4a661e310 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java @@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.Locale; @@ -32,14 +31,15 @@ import java.util.UUID; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.raft.Command; -import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.marshaller.KvMarshaller; import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory; +import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tostring.IgniteToStringInclude; import org.apache.ignite.internal.tostring.S; @@ -52,13 +52,10 @@ import org.junit.jupiter.api.Test; */ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { /** Key-value marshaller for tests. */ - protected static KvMarshaller<TestKey, TestValue> kvMarshaller; + private static KvMarshaller<TestKey, TestValue> kvMarshaller; /** Message factory to create messages - RAFT commands. */ - private TableMessagesFactory msgFactory = new TableMessagesFactory(); - - /** Factory for replica messages. */ - private ReplicaMessagesFactory replicaMessagesFactory = new ReplicaMessagesFactory(); + private final TableMessagesFactory msgFactory = new TableMessagesFactory(); @BeforeAll static void beforeAll() { @@ -84,15 +81,13 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { .build() ) .rowUuid(UUID.randomUUID()) - .rowBuffer(byteBufferFromBinaryRow(1)) + .rowMessage(binaryRowMessage(1)) .txId(TestTransactionIds.newTransactionId()) .build(); UpdateCommand readCmd = copyCommand(cmd); - assertEquals(cmd.txId(), readCmd.txId()); - assertEquals(cmd.rowUuid(), readCmd.rowUuid()); - assertEquals(cmd.rowBuffer(), readCmd.rowBuffer()); + assertEquals(cmd, readCmd); } @Test @@ -111,15 +106,15 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { assertEquals(cmd.txId(), readCmd.txId()); assertEquals(cmd.rowUuid(), readCmd.rowUuid()); - assertNull(readCmd.rowBuffer()); + assertNull(readCmd.rowMessage()); } @Test public void testUpdateAllCommand() throws Exception { - Map<UUID, ByteBuffer> rowsToUpdate = new HashMap<>(); + Map<UUID, BinaryRowMessage> rowsToUpdate = new HashMap<>(); for (int i = 0; i < 10; i++) { - rowsToUpdate.put(TestTransactionIds.newTransactionId(), byteBufferFromBinaryRow(i)); + rowsToUpdate.put(TestTransactionIds.newTransactionId(), binaryRowMessage(i)); } var cmd = msgFactory.updateAllCommand() @@ -136,7 +131,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { assertEquals(cmd.txId(), readCmd.txId()); - for (Map.Entry<UUID, ByteBuffer> entry : cmd.rowsToUpdate().entrySet()) { + for (Map.Entry<UUID, BinaryRowMessage> entry : cmd.rowsToUpdate().entrySet()) { assertTrue(readCmd.rowsToUpdate().containsKey(entry.getKey())); var readVal = readCmd.rowsToUpdate().get(entry.getKey()); @@ -148,7 +143,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { @Test public void testRemoveAllCommand() throws Exception { - Map<UUID, ByteBuffer> rowsToRemove = new HashMap<>(); + Map<UUID, BinaryRowMessage> rowsToRemove = new HashMap<>(); for (int i = 0; i < 10; i++) { rowsToRemove.put(TestTransactionIds.newTransactionId(), null); @@ -245,7 +240,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { .txId(updateCommand.txId()) .rowUuid(updateCommand.rowUuid()) .tablePartitionId(updateCommand.tablePartitionId()) - .rowBuffer(updateCommand.rowBuffer()) + .rowMessage(updateCommand.rowMessage()) .build(); } else if (cmd instanceof UpdateAllCommand) { UpdateAllCommand updateCommand = (UpdateAllCommand) cmd; @@ -262,8 +257,16 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { } } - private static ByteBuffer byteBufferFromBinaryRow(int id) throws Exception { - return kvMarshaller.marshal(new TestKey(id, String.valueOf(id)), new TestValue(id, String.valueOf(id))).byteBuffer(); + private BinaryRowMessage binaryRowMessage(int id) throws Exception { + Row row = kvMarshaller.marshal( + new TestKey(id, String.valueOf(id)), + new TestValue(id, String.valueOf(id)) + ); + + return msgFactory.binaryRowMessage() + .binaryTuple(row.tupleSlice()) + .schemaVersion(row.schemaVersion()) + .build(); } /** diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index f1d103dc99..d7c4f76f78 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -38,7 +38,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.Serializable; -import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; @@ -91,6 +90,7 @@ import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -621,14 +621,12 @@ public class PartitionCommandListenerTest { * Inserts all rows. */ private void insertAll() { - Map<UUID, ByteBuffer> rows = new HashMap<>(KEY_COUNT); + Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT); UUID txId = TestTransactionIds.newTransactionId(); var commitPartId = new TablePartitionId(1, PARTITION_ID); for (int i = 0; i < KEY_COUNT; i++) { - Row row = getTestRow(i, i); - - rows.put(TestTransactionIds.newTransactionId(), row.byteBuffer()); + rows.put(TestTransactionIds.newTransactionId(), getTestRow(i, i)); } HybridTimestamp commitTimestamp = hybridClock.now(); @@ -660,12 +658,12 @@ public class PartitionCommandListenerTest { private void updateAll(Function<Integer, Integer> keyValueMapper) { UUID txId = TestTransactionIds.newTransactionId(); var commitPartId = new TablePartitionId(1, PARTITION_ID); - Map<UUID, ByteBuffer> rows = new HashMap<>(KEY_COUNT); + Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT); for (int i = 0; i < KEY_COUNT; i++) { - Row row = getTestRow(i, keyValueMapper.apply(i)); + BinaryRowMessage row = getTestRow(i, keyValueMapper.apply(i)); - rows.put(readRow(row).uuid(), row.byteBuffer()); + rows.put(readRow(row.asBinaryRow()).uuid(), row); } HybridTimestamp commitTimestamp = hybridClock.now(); @@ -695,12 +693,12 @@ public class PartitionCommandListenerTest { private void deleteAll() { UUID txId = TestTransactionIds.newTransactionId(); var commitPartId = new TablePartitionId(1, PARTITION_ID); - Map<UUID, ByteBuffer> keyRows = new HashMap<>(KEY_COUNT); + Map<UUID, BinaryRowMessage> keyRows = new HashMap<>(KEY_COUNT); for (int i = 0; i < KEY_COUNT; i++) { - Row row = getTestRow(i, i); + BinaryRowMessage row = getTestRow(i, i); - keyRows.put(readRow(row).uuid(), null); + keyRows.put(readRow(row.asBinaryRow()).uuid(), null); } HybridTimestamp commitTimestamp = hybridClock.now(); @@ -734,8 +732,8 @@ public class PartitionCommandListenerTest { commandListener.onWrite(iterator((i, clo) -> { UUID txId = TestTransactionIds.newTransactionId(); - Row row = getTestRow(i, keyValueMapper.apply(i)); - RowId rowId = readRow(row); + BinaryRowMessage row = getTestRow(i, keyValueMapper.apply(i)); + RowId rowId = readRow(row.asBinaryRow()); assertNotNull(rowId); @@ -749,7 +747,7 @@ public class PartitionCommandListenerTest { .tableId(1) .partitionId(PARTITION_ID).build()) .rowUuid(rowId.uuid()) - .rowBuffer(row.byteBuffer()) + .rowMessage(row) .txId(txId) .safeTimeLong(hybridClock.nowLong()) .build()); @@ -779,7 +777,7 @@ public class PartitionCommandListenerTest { commandListener.onWrite(iterator((i, clo) -> { UUID txId = TestTransactionIds.newTransactionId(); - Row row = getTestRow(i, i); + BinaryRow row = getTestRow(i, i).asBinaryRow(); RowId rowId = readRow(row); assertNotNull(rowId); @@ -857,7 +855,6 @@ public class PartitionCommandListenerTest { commandListener.onWrite(iterator((i, clo) -> { UUID txId = TestTransactionIds.newTransactionId(); - Row row = getTestRow(i, i); txIds.add(txId); when(clo.index()).thenReturn(raftIndex.incrementAndGet()); @@ -868,7 +865,7 @@ public class PartitionCommandListenerTest { .tableId(1) .partitionId(PARTITION_ID).build()) .rowUuid(UUID.randomUUID()) - .rowBuffer(row.byteBuffer()) + .rowMessage(getTestRow(i, i)) .txId(txId) .safeTimeLong(hybridClock.nowLong()) .build()); @@ -909,13 +906,18 @@ public class PartitionCommandListenerTest { * * @return Row. */ - private Row getTestRow(int key, int val) { + private BinaryRowMessage getTestRow(int key, int val) { RowAssembler rowBuilder = new RowAssembler(SCHEMA); rowBuilder.appendInt(key); rowBuilder.appendInt(val); - return new Row(SCHEMA, rowBuilder.build()); + BinaryRow row = rowBuilder.build(); + + return msgFactory.binaryRowMessage() + .binaryTuple(row.tupleSlice()) + .schemaVersion(row.schemaVersion()) + .build(); } private void invokeBatchedCommand(WriteCommand cmd) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java index 43a7b826ca..fbae8d3e71 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java @@ -44,7 +44,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -85,6 +84,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; @@ -352,7 +352,7 @@ public class IncomingSnapshotCopierTest { Collections.reverse(readResults); - List<ByteBuffer> rowVersions = new ArrayList<>(); + List<BinaryRowMessage> rowVersions = new ArrayList<>(); long[] timestamps = new long[readResults.size() + (readResults.get(0).isWriteIntent() ? -1 : 0)]; UUID txId = null; @@ -361,7 +361,12 @@ public class IncomingSnapshotCopierTest { int j = 0; for (ReadResult readResult : readResults) { - rowVersions.add(readResult.binaryRow().byteBuffer()); + BinaryRowMessage rowMessage = TABLE_MSG_FACTORY.binaryRowMessage() + .binaryTuple(readResult.binaryRow().tupleSlice()) + .schemaVersion(readResult.binaryRow().schemaVersion()) + .build(); + + rowVersions.add(rowMessage); if (readResult.isWriteIntent()) { txId = readResult.transactionId(); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java index 05f5199784..56afb037c2 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java @@ -30,12 +30,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; +import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; -import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowImpl; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; @@ -52,6 +54,9 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) class OutgoingSnapshotMvDataStreamingTest { + private static final BinaryRow ROW_1 = new BinaryRowImpl(0, ByteBuffer.wrap(new byte[]{1})); + private static final BinaryRow ROW_2 = new BinaryRowImpl(0, ByteBuffer.wrap(new byte[]{2})); + @Mock private PartitionAccess partitionAccess; @@ -93,10 +98,10 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void sendsCommittedAndUncommittedVersionsFromStorage() { - ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now()); + ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now()); ReadResult version2 = ReadResult.createFromWriteIntent( rowId1, - new ByteBufferRow(new byte[]{2}), + ROW_2, transactionId, commitTableId, 42, @@ -118,8 +123,8 @@ class OutgoingSnapshotMvDataStreamingTest { assertThat(responseRow.timestamps(), is(equalTo(new long[] {version1.commitTimestamp().longValue()}))); assertThat(responseRow.rowVersions(), hasSize(2)); - assertThat(responseRow.rowVersions().get(0).array(), is(new byte[]{1})); - assertThat(responseRow.rowVersions().get(1).array(), is(new byte[]{2})); + assertThat(responseRow.rowVersions().get(0).asBinaryRow(), is(ROW_1)); + assertThat(responseRow.rowVersions().get(1).asBinaryRow(), is(ROW_2)); } private void configurePartitionAccessToHaveExactlyOneRowWith(List<ReadResult> versions) { @@ -148,8 +153,8 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void reversesOrderOfVersionsObtainedFromPartition() { - ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now()); - ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now()); + ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now()); + ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now()); configurePartitionAccessToHaveExactlyOneRowWith(List.of(version1, version2)); @@ -164,14 +169,14 @@ class OutgoingSnapshotMvDataStreamingTest { ); assertThat(responseRow.rowVersions(), hasSize(2)); - assertThat(responseRow.rowVersions().get(0).array(), is(new byte[]{2})); - assertThat(responseRow.rowVersions().get(1).array(), is(new byte[]{1})); + assertThat(responseRow.rowVersions().get(0).asBinaryRow(), is(ROW_2)); + assertThat(responseRow.rowVersions().get(1).asBinaryRow(), is(ROW_1)); } @Test void iteratesRowsInPartition() { - ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now()); - ReadResult version2 = ReadResult.createFromCommitted(rowId2, new ByteBufferRow(new byte[]{2}), clock.now()); + ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now()); + ReadResult version2 = ReadResult.createFromCommitted(rowId2, ROW_2, clock.now()); when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1); when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1)); @@ -184,9 +189,9 @@ class OutgoingSnapshotMvDataStreamingTest { assertThat(response.rows(), hasSize(2)); assertThat(response.rows().get(0).rowVersions(), hasSize(1)); - assertThat(response.rows().get(0).rowVersions().get(0).array(), is(new byte[]{1})); + assertThat(response.rows().get(0).rowVersions().get(0).asBinaryRow(), is(ROW_1)); assertThat(response.rows().get(1).rowVersions(), hasSize(1)); - assertThat(response.rows().get(1).rowVersions().get(0).array(), is(new byte[]{2})); + assertThat(response.rows().get(1).rowVersions().get(0).asBinaryRow(), is(ROW_2)); } @Test @@ -209,10 +214,10 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void sendsCommittedAndUncommittedVersionsFromQueue() { - ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now()); + ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, ROW_1, clock.now()); ReadResult version2 = ReadResult.createFromWriteIntent( rowIdOutOfOrder, - new ByteBufferRow(new byte[]{2}), + ROW_2, transactionId, commitTableId, 42, @@ -244,8 +249,8 @@ class OutgoingSnapshotMvDataStreamingTest { assertThat(responseRow.timestamps(), is(equalTo(new long[] {version1.commitTimestamp().longValue()}))); assertThat(responseRow.rowVersions(), hasSize(2)); - assertThat(responseRow.rowVersions().get(0).array(), is(new byte[]{1})); - assertThat(responseRow.rowVersions().get(1).array(), is(new byte[]{2})); + assertThat(responseRow.rowVersions().get(0).asBinaryRow(), is(ROW_1)); + assertThat(responseRow.rowVersions().get(1).asBinaryRow(), is(ROW_2)); } private void configureClosestRowIdToBeEmpty() { @@ -254,8 +259,8 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void sendsOutOfOrderRowsWithHighestPriority() { - ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now()); - ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now()); + ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, ROW_1, clock.now()); + ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now()); when(partitionAccess.getAllRowVersions(rowIdOutOfOrder)).thenReturn(List.of(version1)); @@ -309,7 +314,7 @@ class OutgoingSnapshotMvDataStreamingTest { void doesNotSendWriteIntentTimestamp() { ReadResult version = ReadResult.createFromWriteIntent( rowId1, - new ByteBufferRow(new byte[]{1}), + ROW_1, transactionId, commitTableId, 42, @@ -334,8 +339,8 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void mvDataHandlingRespectsBatchSizeHintForMessagesFromPartition() { - ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now()); - ReadResult version2 = ReadResult.createFromCommitted(rowId2, new ByteBufferRow(new byte[]{2}), clock.now()); + ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now()); + ReadResult version2 = ReadResult.createFromCommitted(rowId2, ROW_2, clock.now()); when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1); when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1)); @@ -349,8 +354,8 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void mvDataHandlingRespectsBatchSizeHintForOutOfOrderMessages() { - ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now()); - ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now()); + ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, ROW_1, clock.now()); + ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now()); when(partitionAccess.getAllRowVersions(rowIdOutOfOrder)).thenReturn(List.of(version1)); @@ -373,8 +378,8 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void mvDataResponseThatIsNotLastHasFinishFalse() { - ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now()); - ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now()); + ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now()); + ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now()); when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1); when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2)); @@ -387,7 +392,9 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void sendsRowsFromPartitionBiggerThanHint() { - ReadResult version = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[1000]), clock.now()); + var row = new BinaryRowImpl(0, ByteBuffer.allocate(1000)); + + ReadResult version = ReadResult.createFromCommitted(rowId1, row, clock.now()); configurePartitionAccessToHaveExactlyOneRowWith(List.of(version)); @@ -395,12 +402,14 @@ class OutgoingSnapshotMvDataStreamingTest { assertThat(response.rows(), hasSize(1)); assertThat(response.rows().get(0).rowVersions(), hasSize(1)); - assertThat(response.rows().get(0).rowVersions().get(0).limit(), is(1000)); + assertThat(response.rows().get(0).rowVersions().get(0).asBinaryRow(), is(row)); } @Test void sendsRowsFromOutOfOrderQueueBiggerThanHint() { - ReadResult version = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[1000]), clock.now()); + var row = new BinaryRowImpl(0, ByteBuffer.allocate(1000)); + + ReadResult version = ReadResult.createFromCommitted(rowIdOutOfOrder, row, clock.now()); when(partitionAccess.getAllRowVersions(rowIdOutOfOrder)).thenReturn(List.of(version)); @@ -418,7 +427,7 @@ class OutgoingSnapshotMvDataStreamingTest { assertThat(response.rows(), hasSize(1)); assertThat(response.rows().get(0).rowVersions(), hasSize(1)); - assertThat(response.rows().get(0).rowVersions().get(0).limit(), is(1000)); + assertThat(response.rows().get(0).rowVersions().get(0).asBinaryRow(), is(row)); } @Test @@ -434,8 +443,8 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void lastSentRowIdIsPassed() { - ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now()); - ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now()); + ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now()); + ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now()); when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1); when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2)); @@ -454,8 +463,8 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void notYetSentRowIdIsNotPassed() { - ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now()); - ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now()); + ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now()); + ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now()); when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1); when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2)); @@ -474,7 +483,7 @@ class OutgoingSnapshotMvDataStreamingTest { @Test void anyRowIdIsPassedForFinishedSnapshot() { - ReadResult version = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now()); + ReadResult version = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now()); configurePartitionAccessToHaveExactlyOneRowWith(List.of(version)); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java index eb73abcd65..c1def6368f 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java @@ -75,6 +75,7 @@ import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; @@ -253,7 +254,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest .term(1L) .commitPartitionId(PARTITION_ID) .transactionId(TRANSACTION_ID) - .binaryRowBytes(testBinaryRow.byteBuffer()) + .binaryRowMessage(binaryRowMessage(testBinaryRow)) .requestType(arg.type) .build()); @@ -299,7 +300,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest .term(1L) .commitPartitionId(PARTITION_ID) .transactionId(TRANSACTION_ID) - .binaryRowsBytes(rows.stream().map(BinaryRow::byteBuffer).collect(toList())) + .binaryRowMessages(rows.stream().map(PartitionReplicaListenerIndexLockingTest::binaryRowMessage).collect(toList())) .requestType(arg.type) .build()); @@ -397,6 +398,13 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest }; } + private static BinaryRowMessage binaryRowMessage(BinaryRow binaryRow) { + return TABLE_MESSAGES_FACTORY.binaryRowMessage() + .binaryTuple(binaryRow.tupleSlice()) + .schemaVersion(binaryRow.schemaVersion()) + .build(); + } + static class ReadWriteTestArg { private final RequestType type; private final LockMode expectedLockOnUniqueHash; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index e9fbfe4e29..0b6fdd5e17 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -78,6 +78,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowConverter; +import org.apache.ignite.internal.schema.BinaryRowImpl; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeTypes; @@ -115,6 +116,7 @@ import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest; import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException; @@ -513,7 +515,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest() .groupId(grpId) .readTimestampLong(clock.nowLong()) - .binaryRowBytes(testBinaryKey.byteBuffer()) + .binaryRowMessage(binaryRowMessage(testBinaryKey)) .requestType(RequestType.RO_GET) .build()); @@ -536,7 +538,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest() .groupId(grpId) .readTimestampLong(clock.nowLong()) - .binaryRowBytes(testBinaryKey.byteBuffer()) + .binaryRowMessage(binaryRowMessage(testBinaryKey)) .requestType(RequestType.RO_GET) .build()); @@ -559,7 +561,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest() .groupId(grpId) .readTimestampLong(clock.nowLong()) - .binaryRowBytes(testBinaryKey.byteBuffer()) + .binaryRowMessage(binaryRowMessage(testBinaryKey)) .requestType(RequestType.RO_GET) .build()); @@ -581,7 +583,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest() .groupId(grpId) .readTimestampLong(clock.nowLong()) - .binaryRowBytes(testBinaryKey.byteBuffer()) + .binaryRowMessage(binaryRowMessage(testBinaryKey)) .requestType(RequestType.RO_GET) .build()); @@ -604,7 +606,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest() .groupId(grpId) .readTimestampLong(clock.nowLong()) - .binaryRowBytes(testBinaryKey.byteBuffer()) + .binaryRowMessage(binaryRowMessage(testBinaryKey)) .requestType(RequestType.RO_GET) .build()); @@ -985,7 +987,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .groupId(grpId) .transactionId(txId) .requestType(requestType) - .binaryRowBytes(binaryRow.byteBuffer()) + .binaryRowMessage(binaryRowMessage(binaryRow)) .term(1L) .commitPartitionId(commitPartitionId()) .build() @@ -1001,7 +1003,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .groupId(grpId) .transactionId(txId) .requestType(requestType) - .binaryRowsBytes(binaryRows.stream().map(BinaryRow::byteBuffer).collect(toList())) + .binaryRowMessages(binaryRows.stream().map(PartitionReplicaListenerTest::binaryRowMessage).collect(toList())) .term(1L) .commitPartitionId(commitPartitionId()) .build() @@ -1022,7 +1024,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .groupId(grpId) .transactionId(txId) .requestType(RequestType.RW_INSERT) - .binaryRowBytes(binaryRow.byteBuffer()) + .binaryRowMessage(binaryRowMessage(binaryRow)) .term(1L) .commitPartitionId(commitPartitionId()) .build(); @@ -1049,7 +1051,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .groupId(grpId) .transactionId(txId) .requestType(RequestType.RW_UPSERT_ALL) - .binaryRowsBytes(asList(binaryRow0.byteBuffer(), binaryRow1.byteBuffer())) + .binaryRowMessages(asList(binaryRowMessage(binaryRow0), binaryRowMessage(binaryRow1))) .term(1L) .commitPartitionId(commitPartitionId()) .build(); @@ -1072,7 +1074,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow(); - if (rowEquals(binaryRow, row)) { + if (binaryRow.equals(row)) { found = true; } } @@ -1083,7 +1085,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow(); - assertTrue(row == null || !rowEquals(row, binaryRow)); + assertTrue(row == null || !row.equals(binaryRow)); } } @@ -1226,11 +1228,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { ? (upsertAfterDelete ? br1 : null) : (insertFirst ? br1 : null); - if (expected == null) { - assertNull(res); - } else { - assertTrue(rowEquals(expected, res)); - } + assertEquals(expected, res); } cleanup(tx1); @@ -1465,8 +1463,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .groupId(grpId) .transactionId(targetTxId) .requestType(RequestType.RW_REPLACE) - .oldBinaryRowBytes(binaryRow(key, new TestValue(1, "v1"), kvMarshaller).byteBuffer()) - .binaryRowBytes(binaryRow(key, new TestValue(3, "v3"), kvMarshaller).byteBuffer()) + .oldBinaryRowMessage(binaryRowMessage(binaryRow(key, new TestValue(1, "v1"), kvMarshaller))) + .binaryRowMessage(binaryRowMessage(binaryRow(key, new TestValue(3, "v3"), kvMarshaller))) .term(1L) .commitPartitionId(commitPartitionId()) .build() @@ -1528,7 +1526,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .groupId(grpId) .requestType(RequestType.RW_UPSERT) .transactionId(txId) - .binaryRowBytes(row.byteBuffer()) + .binaryRowMessage(binaryRowMessage(row)) .term(1L) .commitPartitionId(new TablePartitionId(tblId, partId)) .build() @@ -1540,7 +1538,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .groupId(grpId) .requestType(RequestType.RW_DELETE) .transactionId(txId) - .binaryRowBytes(row.byteBuffer()) + .binaryRowMessage(binaryRowMessage(row)) .term(1L) .commitPartitionId(new TablePartitionId(tblId, partId)) .build() @@ -1552,7 +1550,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .groupId(grpId) .requestType(RequestType.RO_GET) .readTimestampLong(readTimestamp) - .binaryRowBytes(row.byteBuffer()) + .binaryRowMessage(binaryRowMessage(row)) .build() ); @@ -1564,7 +1562,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .groupId(grpId) .requestType(RequestType.RO_GET_ALL) .readTimestampLong(readTimestamp) - .binaryRowsBytes(rows.stream().map(BinaryRow::byteBuffer).collect(toList())) + .binaryRowMessages(rows.stream().map(PartitionReplicaListenerTest::binaryRowMessage).collect(toList())) .build() ); @@ -1619,11 +1617,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } protected BinaryRow binaryRow(int i) { - try { - return kvMarshaller.marshal(new TestKey(i, "k" + i), new TestValue(i, "v" + i)); - } catch (MarshallerException e) { - throw new AssertionError(e); - } + return binaryRow(new TestKey(i, "k" + i), new TestValue(i, "v" + i)); } private BinaryRow binaryRow(TestKey key, TestValue value) { @@ -1632,7 +1626,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { private static BinaryRow binaryRow(TestKey key, TestValue value, KvMarshaller<TestKey, TestValue> marshaller) { try { - return marshaller.marshal(key, value); + Row row = marshaller.marshal(key, value); + + return new BinaryRowImpl(row.schemaVersion(), row.tupleSlice()); } catch (MarshallerException e) { throw new AssertionError(e); } @@ -1654,10 +1650,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } } - private static boolean rowEquals(BinaryRow row1, BinaryRow row2) { - return row1.schemaVersion() == row2.schemaVersion() - && row1.hasValue() == row2.hasValue() - && row1.tupleSlice().equals(row2.tupleSlice()); + private static BinaryRowMessage binaryRowMessage(BinaryRow binaryRow) { + return TABLE_MESSAGES_FACTORY.binaryRowMessage() + .binaryTuple(binaryRow.tupleSlice()) + .schemaVersion(binaryRow.schemaVersion()) + .build(); } /**