This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new c60f052bd0 IGNITE-20435 Preserve key order in batch opperations (deleteAll, deleteAllExact, insertAll) (#2664) c60f052bd0 is described below commit c60f052bd074cad30c753cc1903018a45362a34b Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Tue Oct 17 12:08:21 2023 +0300 IGNITE-20435 Preserve key order in batch opperations (deleteAll, deleteAllExact, insertAll) (#2664) --- .../java/org/apache/ignite/table/RecordView.java | 45 ++-- .../client/table/ClientRecordBinaryView.java | 14 +- .../internal/client/table/ClientRecordView.java | 12 +- .../client/table/ClientTupleSerializer.java | 4 +- .../ignite/client/fakes/FakeInternalTable.java | 33 +-- .../client-test/key_value_binary_view_test.cpp | 19 +- .../cpp/tests/client-test/key_value_view_test.cpp | 14 +- .../tests/client-test/record_binary_view_test.cpp | 19 +- .../cpp/tests/client-test/record_view_test.cpp | 14 +- ...ItRoReadsTest.java => ItInternalTableTest.java} | 262 ++++++++++++++------- .../sql/engine/exec/UpdatableTableImpl.java | 95 ++++---- .../ignite/internal/table/ItColocationTest.java | 5 +- .../ignite/internal/table/InternalTable.java | 24 +- .../internal/table/RecordBinaryViewImpl.java | 12 +- .../ignite/internal/table/RecordViewImpl.java | 12 +- .../replicator/PartitionReplicaListener.java | 11 +- .../distributed/storage/InternalTableImpl.java | 116 +++++---- .../table/distributed/storage/RowBatch.java | 8 +- .../distributed/storage/InternalTableImplTest.java | 39 ++- .../table/impl/DummyInternalTableImpl.java | 3 +- 20 files changed, 454 insertions(+), 307 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/table/RecordView.java b/modules/api/src/main/java/org/apache/ignite/table/RecordView.java index e80a9da595..e95fc98b2e 100644 --- a/modules/api/src/main/java/org/apache/ignite/table/RecordView.java +++ b/modules/api/src/main/java/org/apache/ignite/table/RecordView.java @@ -143,20 +143,23 @@ public interface RecordView<R> extends DataStreamerTarget<R> { /** * Inserts into a table records that do not exist, skips those that exist. * - * @param tx Transaction or {@code null} to auto-commit. + * @param tx Transaction or {@code null} to auto-commit. * @param recs Records to insert into the table. The records cannot be {@code null}. - * @return Skipped records. + * @return Skipped records. The order of collection elements is guaranteed to be the same as the order of {@code recs}. If a record is + * inserted, the element will be excluded from the collection result. */ - Collection<R> insertAll(@Nullable Transaction tx, Collection<R> recs); + List<R> insertAll(@Nullable Transaction tx, Collection<R> recs); /** * Asynchronously inserts into a table records that do not exist, skips those that exist. * - * @param tx Transaction or {@code null} to auto-commit. + * @param tx Transaction or {@code null} to auto-commit. * @param recs Records to insert into the table. The records cannot be {@code null}. - * @return Future representing pending completion of the operation. + * @return Future representing pending completion of the operation, with rejected rows for insertion in the result. The order of + * collection elements is guaranteed to be the same as the order of {@code recs}. If a record is inserted, the element will be + * excluded from the collection result. */ - CompletableFuture<Collection<R>> insertAllAsync(@Nullable Transaction tx, Collection<R> recs); + CompletableFuture<List<R>> insertAllAsync(@Nullable Transaction tx, Collection<R> recs); /** * Replaces an existing record associated with the same key column values as the given record. @@ -272,36 +275,42 @@ public interface RecordView<R> extends DataStreamerTarget<R> { /** * Removes from a table records with the same key column values as the given one. * - * @param tx Transaction or {@code null} to auto-commit. + * @param tx Transaction or {@code null} to auto-commit. * @param keyRecs Records with the key columns set. The records cannot be {@code null}. - * @return Records with the key columns set that did not exist. + * @return Records with the key columns set that did not exist. The order of collection elements is guaranteed to be the same as the + * order of {@code keyRecs}. If a record is removed, the element will be excluded from the collection result. */ - Collection<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs); + List<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs); /** * Asynchronously removes from a table records with the same key column values as the given one. * - * @param tx Transaction or {@code null} to auto-commit. + * @param tx Transaction or {@code null} to auto-commit. * @param keyRecs Records with the key columns set. The records cannot be {@code null}. - * @return Future that represents the pending completion of the operation. + * @return Future represents the pending completion of the operation, with rejected rows for deletion in the result. The order of + * collection elements is guaranteed to be the same as the order of {@code keyRecs}. If a record is removed, the element will be + * excluded from the collection result. */ - CompletableFuture<Collection<R>> deleteAllAsync(@Nullable Transaction tx, Collection<R> keyRecs); + CompletableFuture<List<R>> deleteAllAsync(@Nullable Transaction tx, Collection<R> keyRecs); /** * Remove the given records from a table. * - * @param tx Transaction or {@code null} to auto-commit. + * @param tx Transaction or {@code null} to auto-commit. * @param recs Records to delete. The records cannot be {@code null}. - * @return Records that were not deleted. + * @return Records that were not deleted. The order of collection elements is guaranteed to be the same as the order of {@code recs}. If + * a record is removed, the element will be excluded from the collection result. */ - Collection<R> deleteAllExact(@Nullable Transaction tx, Collection<R> recs); + List<R> deleteAllExact(@Nullable Transaction tx, Collection<R> recs); /** * Asynchronously removes the given records from a table. * - * @param tx Transaction or {@code null} to auto-commit. + * @param tx Transaction or {@code null} to auto-commit. * @param recs Records to delete. The records cannot be {@code null}. - * @return Future that represents the pending completion of the operation. + * @return Future represents the pending completion of the operation, with rejected rows for deletion in the result. The order of + * collection elements is guaranteed to be the same as the order of {@code recs}. If a record is removed, the element will be + * excluded from the collection result. */ - CompletableFuture<Collection<R>> deleteAllExactAsync(@Nullable Transaction tx, Collection<R> recs); + CompletableFuture<List<R>> deleteAllExactAsync(@Nullable Transaction tx, Collection<R> recs); } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java index 3610d15c06..5c7d17c6ec 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java @@ -178,17 +178,17 @@ public class ClientRecordBinaryView implements RecordView<Tuple> { /** {@inheritDoc} */ @Override - public Collection<Tuple> insertAll(@Nullable Transaction tx, Collection<Tuple> recs) { + public List<Tuple> insertAll(@Nullable Transaction tx, Collection<Tuple> recs) { return sync(insertAllAsync(tx, recs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<Tuple>> insertAllAsync(@Nullable Transaction tx, Collection<Tuple> recs) { + public CompletableFuture<List<Tuple>> insertAllAsync(@Nullable Transaction tx, Collection<Tuple> recs) { Objects.requireNonNull(recs); if (recs.isEmpty()) { - return CompletableFuture.completedFuture(Collections.emptyList()); + return completedFuture(Collections.emptyList()); } return tbl.doSchemaOutInOpAsync( @@ -315,13 +315,13 @@ public class ClientRecordBinaryView implements RecordView<Tuple> { /** {@inheritDoc} */ @Override - public Collection<Tuple> deleteAll(@Nullable Transaction tx, Collection<Tuple> keyRecs) { + public List<Tuple> deleteAll(@Nullable Transaction tx, Collection<Tuple> keyRecs) { return sync(deleteAllAsync(tx, keyRecs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<Tuple>> deleteAllAsync(@Nullable Transaction tx, Collection<Tuple> keyRecs) { + public CompletableFuture<List<Tuple>> deleteAllAsync(@Nullable Transaction tx, Collection<Tuple> keyRecs) { Objects.requireNonNull(keyRecs); if (keyRecs.isEmpty()) { @@ -338,13 +338,13 @@ public class ClientRecordBinaryView implements RecordView<Tuple> { /** {@inheritDoc} */ @Override - public Collection<Tuple> deleteAllExact(@Nullable Transaction tx, Collection<Tuple> recs) { + public List<Tuple> deleteAllExact(@Nullable Transaction tx, Collection<Tuple> recs) { return sync(deleteAllExactAsync(tx, recs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@Nullable Transaction tx, Collection<Tuple> recs) { + public CompletableFuture<List<Tuple>> deleteAllExactAsync(@Nullable Transaction tx, Collection<Tuple> recs) { Objects.requireNonNull(recs); if (recs.isEmpty()) { diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java index 0b3f514a6d..33745d7fbc 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java @@ -178,13 +178,13 @@ public class ClientRecordView<R> implements RecordView<R> { /** {@inheritDoc} */ @Override - public Collection<R> insertAll(@Nullable Transaction tx, Collection<R> recs) { + public List<R> insertAll(@Nullable Transaction tx, Collection<R> recs) { return sync(insertAllAsync(tx, recs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<R>> insertAllAsync(@Nullable Transaction tx, Collection<R> recs) { + public CompletableFuture<List<R>> insertAllAsync(@Nullable Transaction tx, Collection<R> recs) { Objects.requireNonNull(recs); if (recs.isEmpty()) { @@ -312,13 +312,13 @@ public class ClientRecordView<R> implements RecordView<R> { /** {@inheritDoc} */ @Override - public Collection<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs) { + public List<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs) { return sync(deleteAllAsync(tx, keyRecs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<R>> deleteAllAsync(@Nullable Transaction tx, Collection<R> keyRecs) { + public CompletableFuture<List<R>> deleteAllAsync(@Nullable Transaction tx, Collection<R> keyRecs) { Objects.requireNonNull(keyRecs); if (keyRecs.isEmpty()) { @@ -335,13 +335,13 @@ public class ClientRecordView<R> implements RecordView<R> { /** {@inheritDoc} */ @Override - public Collection<R> deleteAllExact(@Nullable Transaction tx, Collection<R> recs) { + public List<R> deleteAllExact(@Nullable Transaction tx, Collection<R> recs) { return sync(deleteAllExactAsync(tx, recs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<R>> deleteAllExactAsync(@Nullable Transaction tx, Collection<R> recs) { + public CompletableFuture<List<R>> deleteAllExactAsync(@Nullable Transaction tx, Collection<R> recs) { Objects.requireNonNull(recs); if (recs.isEmpty()) { diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java index 63f6901ff5..e5a7d683b9 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java @@ -312,11 +312,11 @@ public class ClientTupleSerializer { return res; } - static Collection<Tuple> readTuples(ClientSchema schema, ClientMessageUnpacker in) { + static List<Tuple> readTuples(ClientSchema schema, ClientMessageUnpacker in) { return readTuples(schema, in, false); } - static Collection<Tuple> readTuples(ClientSchema schema, ClientMessageUnpacker in, boolean keyOnly) { + static List<Tuple> readTuples(ClientSchema schema, ClientMessageUnpacker in, boolean keyOnly) { var cnt = in.unpackInt(); var res = new ArrayList<Tuple>(cnt); diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java index 44aef27200..02e382a279 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java @@ -79,13 +79,11 @@ public class FakeInternalTable implements InternalTable { this.keyExtractor = keyExtractor; } - /** {@inheritDoc} */ @Override public MvTableStorage storage() { throw new UnsupportedOperationException("Not implemented yet"); } - /** {@inheritDoc} */ @Override public int partitions() { return 1; @@ -96,19 +94,16 @@ public class FakeInternalTable implements InternalTable { return tableId; } - /** {@inheritDoc} */ @Override public String name() { return tableName; } - /** {@inheritDoc} */ @Override public int partitionId(BinaryRowEx row) { return 0; } - /** {@inheritDoc} */ @Override public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable InternalTransaction tx) { return completedFuture(getImpl(keyRow.tupleSlice(), keyRow)); @@ -128,7 +123,6 @@ public class FakeInternalTable implements InternalTable { return data.get(key); } - /** {@inheritDoc} */ @Override public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows, @Nullable InternalTransaction tx) { var res = new ArrayList<BinaryRow>(); @@ -156,7 +150,6 @@ public class FakeInternalTable implements InternalTable { return null; } - /** {@inheritDoc} */ @Override public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransaction tx) { upsertImpl(keyExtractor.extractColumns(row), row); @@ -170,7 +163,6 @@ public class FakeInternalTable implements InternalTable { data.put(key.byteBuffer(), row); } - /** {@inheritDoc} */ @Override public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) { for (var row : rows) { @@ -181,13 +173,11 @@ public class FakeInternalTable implements InternalTable { return completedFuture(null); } - /** {@inheritDoc} */ @Override public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int partition) { throw new UnsupportedOperationException(); } - /** {@inheritDoc} */ @Override public CompletableFuture<BinaryRow> getAndUpsert(BinaryRowEx row, @Nullable InternalTransaction tx) { @@ -202,7 +192,6 @@ public class FakeInternalTable implements InternalTable { return completedFuture(res); } - /** {@inheritDoc} */ @Override public CompletableFuture<Boolean> insert(BinaryRowEx row, @Nullable InternalTransaction tx) { BinaryTuple key = keyExtractor.extractColumns(row); @@ -218,9 +207,8 @@ public class FakeInternalTable implements InternalTable { return completedFuture(old == null); } - /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) { + public CompletableFuture<List<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) { var skipped = new ArrayList<BinaryRow>(); for (var row : rows) { @@ -233,7 +221,6 @@ public class FakeInternalTable implements InternalTable { return completedFuture(skipped); } - /** {@inheritDoc} */ @Override public CompletableFuture<Boolean> replace(BinaryRowEx row, @Nullable InternalTransaction tx) { BinaryTuple key = keyExtractor.extractColumns(row); @@ -241,7 +228,6 @@ public class FakeInternalTable implements InternalTable { return completedFuture(replaceImpl(key, row, tx) != null); } - /** {@inheritDoc} */ @Override public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx newRow, @Nullable InternalTransaction tx) { BinaryTuple key = keyExtractor.extractColumns(oldRow); @@ -275,7 +261,6 @@ public class FakeInternalTable implements InternalTable { return old; } - /** {@inheritDoc} */ @Override public CompletableFuture<BinaryRow> getAndReplace(BinaryRowEx row, @Nullable InternalTransaction tx) { BinaryTuple key = keyExtractor.extractColumns(row); @@ -287,7 +272,6 @@ public class FakeInternalTable implements InternalTable { return completedFuture(replace); } - /** {@inheritDoc} */ @Override public CompletableFuture<Boolean> delete(BinaryRowEx keyRow, @Nullable InternalTransaction tx) { BinaryRow old = getImpl(keyRow.tupleSlice(), keyRow); @@ -300,7 +284,6 @@ public class FakeInternalTable implements InternalTable { return completedFuture(old != null); } - /** {@inheritDoc} */ @Override public CompletableFuture<Boolean> deleteExact(BinaryRowEx oldRow, @Nullable InternalTransaction tx) { var res = false; @@ -318,7 +301,6 @@ public class FakeInternalTable implements InternalTable { return completedFuture(res); } - /** {@inheritDoc} */ @Override public CompletableFuture<BinaryRow> getAndDelete(BinaryRowEx row, @Nullable InternalTransaction tx) { BinaryRow old = getImpl(row.tupleSlice(), row); @@ -331,9 +313,8 @@ public class FakeInternalTable implements InternalTable { return completedFuture(old); } - /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) { + public CompletableFuture<List<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) { var skipped = new ArrayList<BinaryRow>(); for (var row : rows) { @@ -346,9 +327,8 @@ public class FakeInternalTable implements InternalTable { return completedFuture(skipped); } - /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) { + public CompletableFuture<List<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) { var skipped = new ArrayList<BinaryRow>(); for (var row : rows) { @@ -434,42 +414,35 @@ public class FakeInternalTable implements InternalTable { throw new IgniteInternalException(new OperationNotSupportedException()); } - /** {@inheritDoc} */ @Override public List<String> assignments() { throw new IgniteInternalException(new OperationNotSupportedException()); } - /** {@inheritDoc} */ @Override public CompletableFuture<List<PrimaryReplica>> primaryReplicas() { return CompletableFuture.failedFuture(new IgniteInternalException(new OperationNotSupportedException())); } - /** {@inheritDoc} */ @Override public ClusterNode leaderAssignment(int partition) { throw new IgniteInternalException(new OperationNotSupportedException()); } - /** {@inheritDoc} */ @Override public RaftGroupService partitionRaftGroupService(int partition) { return null; } - /** {@inheritDoc} */ @Override public TxStateTableStorage txStateStorage() { return null; } - /** {@inheritDoc} */ @Override public int partition(BinaryRowEx keyRow) { return 0; } - /** {@inheritDoc} */ @Override public void close() { // No-op. diff --git a/modules/platforms/cpp/tests/client-test/key_value_binary_view_test.cpp b/modules/platforms/cpp/tests/client-test/key_value_binary_view_test.cpp index 884d1e2ad2..3073c3db38 100644 --- a/modules/platforms/cpp/tests/client-test/key_value_binary_view_test.cpp +++ b/modules/platforms/cpp/tests/client-test/key_value_binary_view_test.cpp @@ -860,12 +860,11 @@ TEST_F(key_value_binary_view_test, remove_all_nonexisting_keys_return_all) { EXPECT_EQ(res.size(), 2); - // TODO: Key order should be preserved by the server (IGNITE-20435). EXPECT_EQ(1, res[0].column_count()); - EXPECT_EQ(2, res[0].get<int64_t>("key")); + EXPECT_EQ(1, res[0].get<int64_t>("key")); EXPECT_EQ(1, res[1].column_count()); - EXPECT_EQ(1, res[1].get<int64_t>("key")); + EXPECT_EQ(2, res[1].get<int64_t>("key")); } TEST_F(key_value_binary_view_test, remove_all_only_existing) { @@ -896,12 +895,11 @@ TEST_F(key_value_binary_view_test, remove_all_overlapped) { EXPECT_EQ(res.size(), 2); - // TODO: Key order should be preserved by the server (IGNITE-20435). EXPECT_EQ(1, res[0].column_count()); - EXPECT_EQ(12, res[0].get<int64_t>("key")); + EXPECT_EQ(11, res[0].get<int64_t>("key")); EXPECT_EQ(1, res[1].column_count()); - EXPECT_EQ(11, res[1].get<int64_t>("key")); + EXPECT_EQ(12, res[1].get<int64_t>("key")); } TEST_F(key_value_binary_view_test, remove_all_empty) { @@ -912,8 +910,15 @@ TEST_F(key_value_binary_view_test, remove_all_empty) { TEST_F(key_value_binary_view_test, remove_all_exact_nonexisting) { auto res = kv_view.remove_all(nullptr, {{get_tuple(1), get_tuple("foo")}, {get_tuple(2), get_tuple("bar")}}); - // TODO: Key order should be preserved by the server (IGNITE-20435). ASSERT_EQ(2, res.size()); + + EXPECT_EQ(2, res[0].column_count()); + EXPECT_EQ(1, res[0].get<int64_t>("key")); + EXPECT_EQ("foo", res[0].get<std::string>("val")); + + EXPECT_EQ(2, res[1].column_count()); + EXPECT_EQ(2, res[1].get<int64_t>("key")); + EXPECT_EQ("bar", res[1].get<std::string>("val")); } TEST_F(key_value_binary_view_test, remove_all_exact_overlapped) { diff --git a/modules/platforms/cpp/tests/client-test/key_value_view_test.cpp b/modules/platforms/cpp/tests/client-test/key_value_view_test.cpp index 98ee0264b0..39b13a8238 100644 --- a/modules/platforms/cpp/tests/client-test/key_value_view_test.cpp +++ b/modules/platforms/cpp/tests/client-test/key_value_view_test.cpp @@ -699,9 +699,8 @@ TEST_F(key_value_view_test, remove_all_nonexisting_keys_return_all) { EXPECT_EQ(res.size(), 2); - // TODO: Key order should be preserved by the server (IGNITE-20435). - EXPECT_EQ(2, res[0].key); - EXPECT_EQ(1, res[1].key); + EXPECT_EQ(1, res[0].key); + EXPECT_EQ(2, res[1].key); } TEST_F(key_value_view_test, remove_all_only_existing) { @@ -732,9 +731,8 @@ TEST_F(key_value_view_test, remove_all_overlapped) { EXPECT_EQ(res.size(), 2); - // TODO: Key order should be preserved by the server (IGNITE-20435). - EXPECT_EQ(12, res[0].key); - EXPECT_EQ(11, res[1].key); + EXPECT_EQ(11, res[0].key); + EXPECT_EQ(12, res[1].key); } TEST_F(key_value_view_test, remove_all_empty) { @@ -746,8 +744,10 @@ TEST_F(key_value_view_test, remove_all_exact_nonexisting) { auto res = kv_view.remove_all( nullptr, {{test_key_type(1), test_value_type("foo")}, {test_key_type(2), test_value_type("bar")}}); - // TODO: Key order should be preserved by the server (IGNITE-20435). ASSERT_EQ(2, res.size()); + + EXPECT_EQ(1, res[0].key); + EXPECT_EQ(2, res[1].key); } TEST_F(key_value_view_test, remove_all_exact_overlapped) { diff --git a/modules/platforms/cpp/tests/client-test/record_binary_view_test.cpp b/modules/platforms/cpp/tests/client-test/record_binary_view_test.cpp index dbdb2c4eaf..589072567e 100644 --- a/modules/platforms/cpp/tests/client-test/record_binary_view_test.cpp +++ b/modules/platforms/cpp/tests/client-test/record_binary_view_test.cpp @@ -862,12 +862,11 @@ TEST_F(record_binary_view_test, remove_all_nonexisting_keys_return_all) { EXPECT_EQ(res.size(), 2); - // TODO: Key order should be preserved by the server (IGNITE-20435). EXPECT_EQ(1, res[0].column_count()); - EXPECT_EQ(2, res[0].get<int64_t>("key")); + EXPECT_EQ(1, res[0].get<int64_t>("key")); EXPECT_EQ(1, res[1].column_count()); - EXPECT_EQ(1, res[1].get<int64_t>("key")); + EXPECT_EQ(2, res[1].get<int64_t>("key")); } TEST_F(record_binary_view_test, remove_all_only_existing) { @@ -897,12 +896,11 @@ TEST_F(record_binary_view_test, remove_all_overlapped) { EXPECT_EQ(res.size(), 2); - // TODO: Key order should be preserved by the server (IGNITE-20435). EXPECT_EQ(1, res[0].column_count()); - EXPECT_EQ(12, res[0].get<int64_t>("key")); + EXPECT_EQ(11, res[0].get<int64_t>("key")); EXPECT_EQ(1, res[1].column_count()); - EXPECT_EQ(11, res[1].get<int64_t>("key")); + EXPECT_EQ(12, res[1].get<int64_t>("key")); } TEST_F(record_binary_view_test, remove_all_empty) { @@ -913,8 +911,15 @@ TEST_F(record_binary_view_test, remove_all_empty) { TEST_F(record_binary_view_test, remove_all_exact_nonexisting) { auto res = tuple_view.remove_all_exact(nullptr, {get_tuple(1, "foo"), get_tuple(2, "bar")}); - // TODO: Key order should be preserved by the server (IGNITE-20435). ASSERT_EQ(2, res.size()); + + EXPECT_EQ(2, res[0].column_count()); + EXPECT_EQ(1, res[0].get<int64_t>("key")); + EXPECT_EQ("foo", res[0].get<std::string>("val")); + + EXPECT_EQ(2, res[1].column_count()); + EXPECT_EQ(2, res[1].get<int64_t>("key")); + EXPECT_EQ("bar", res[1].get<std::string>("val")); } TEST_F(record_binary_view_test, remove_all_exact_overlapped) { diff --git a/modules/platforms/cpp/tests/client-test/record_view_test.cpp b/modules/platforms/cpp/tests/client-test/record_view_test.cpp index 232c9df42c..d4d3318fa9 100644 --- a/modules/platforms/cpp/tests/client-test/record_view_test.cpp +++ b/modules/platforms/cpp/tests/client-test/record_view_test.cpp @@ -920,9 +920,8 @@ TEST_F(record_view_test, remove_all_nonexisting_keys_return_all) { EXPECT_EQ(res.size(), 2); - // TODO: Key order should be preserved by the server (IGNITE-20435). - EXPECT_EQ(2, res[0].key); - EXPECT_EQ(1, res[1].key); + EXPECT_EQ(1, res[0].key); + EXPECT_EQ(2, res[1].key); } TEST_F(record_view_test, remove_all_only_existing) { @@ -952,9 +951,8 @@ TEST_F(record_view_test, remove_all_overlapped) { EXPECT_EQ(res.size(), 2); - // TODO: Key order should be preserved by the server (IGNITE-20435). - EXPECT_EQ(12, res[0].key); - EXPECT_EQ(11, res[1].key); + EXPECT_EQ(11, res[0].key); + EXPECT_EQ(12, res[1].key); } TEST_F(record_view_test, remove_all_empty) { @@ -965,8 +963,10 @@ TEST_F(record_view_test, remove_all_empty) { TEST_F(record_view_test, remove_all_exact_nonexisting) { auto res = view.remove_all_exact(nullptr, {test_type(1, "foo"), test_type(2, "bar")}); - // TODO: Key order should be preserved by the server (IGNITE-20435). ASSERT_EQ(2, res.size()); + + EXPECT_EQ(1, res[0].key); + EXPECT_EQ(2, res[1].key); } TEST_F(record_view_test, remove_all_exact_overlapped) { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java similarity index 68% rename from modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java rename to modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java index cab67eedc0..9a7503e73d 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java @@ -27,23 +27,26 @@ import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; import org.apache.ignite.InitParameters; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.lang.IgniteStringFormatter; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.Column; @@ -71,12 +74,10 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; /** - * Tests for the read-only API. + * Tests for the internal table API. */ @ExtendWith(WorkDirectoryExtension.class) -public class ItRoReadsTest extends BaseIgniteAbstractTest { - private static final IgniteLogger LOG = Loggers.forClass(ItRoReadsTest.class); - +public class ItInternalTableTest extends BaseIgniteAbstractTest { private static final String TABLE_NAME = "SOME_TABLE"; private static final SchemaDescriptor SCHEMA_1 = new SchemaDescriptor( @@ -133,13 +134,9 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { @AfterAll static void stopNode(TestInfo testInfo) throws Exception { - LOG.info("Start tearDown()"); - NODE = null; IgniteUtils.closeAll(() -> IgnitionManager.stop(testNodeName(testInfo, 0))); - - LOG.info("End tearDown()"); } @BeforeEach @@ -235,34 +232,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { tx2.commit(); - Publisher<BinaryRow> res = internalTable.scan(0, node.clock().now(), node.node()); - - CountDownLatch latch = new CountDownLatch(1); - - List<BinaryRow> list = new ArrayList<>(); - - res.subscribe(new Subscriber<>() { - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(100); - } - - @Override - public void onNext(BinaryRow item) { - list.add(item); - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onComplete() { - latch.countDown(); - } - }); - - latch.await(); + List<BinaryRow> list = scanAllPartitions(node); assertThat(list, contains(equalToRow(keyValueRow2))); } @@ -384,78 +354,206 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { roScanAll(false); } - private void roScanAll(boolean implicit) throws InterruptedException { - IgniteImpl node = node(); + @Test + public void getAllOrderTest() { + List<BinaryRowEx> keyRows = populateEvenKeysAndPrepareEntriesToLookup(true); InternalTable internalTable = ((TableImpl) table).internalTable(); + SchemaDescriptor schemaDescriptor = ((TableImpl) table).schemaView().schema(); - KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView(); + CompletableFuture<List<BinaryRow>> getAllFut = internalTable.getAll(keyRows, null); - Publisher<BinaryRow> res = internalTable.scan(0, node.clock().now(), node.node()); + assertThat(getAllFut, willCompleteSuccessfully()); - var subscriberAllDataAwaitLatch = new CountDownLatch(1); + List<BinaryRow> res = getAllFut.join(); - var retrievedItems = new ArrayList<BinaryRow>(); + assertEquals(keyRows.size(), res.size()); - res.subscribe(new Subscriber<>() { - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(10000); - } + Iterator<BinaryRow> resIter = res.iterator(); - @Override - public void onNext(BinaryRow item) { - retrievedItems.add(item); - } + for (BinaryRowEx key : keyRows) { + int i = TableRow.keyTuple(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, key)).<Long>value("key").intValue(); - @Override - public void onError(Throwable throwable) { - fail("onError call is not expected."); - } + BinaryRow resRow = resIter.next(); + + if (i % 2 == 1) { + assertNull(resRow); + } else { + assertNotNull(resRow); + + Tuple rowTuple = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resRow)); - @Override - public void onComplete() { - subscriberAllDataAwaitLatch.countDown(); + assertEquals(i % 100L, rowTuple.<Long>value("key")); + assertEquals(i, rowTuple.<Integer>value("valInt")); + assertEquals("some string row" + i, rowTuple.<Integer>value("valStr")); } - }); + } + } - subscriberAllDataAwaitLatch.await(); + @Test + public void deleteAllOrderTest() { + List<BinaryRowEx> keyRows = populateEvenKeysAndPrepareEntriesToLookup(true); - assertEquals(0, retrievedItems.size()); + InternalTable internalTable = ((TableImpl) table).internalTable(); + SchemaDescriptor schemaDescriptor = ((TableImpl) table).schemaView().schema(); - populateData(node, keyValueView, implicit); + CompletableFuture<List<BinaryRow>> deleteAllFut = internalTable.deleteAll(keyRows, null); + + assertThat(deleteAllFut, willCompleteSuccessfully()); + + List<BinaryRow> res = deleteAllFut.join(); - res = internalTable.scan(0, node.clock().now(), node.node()); + Iterator<BinaryRow> resIter = res.iterator(); - var subscriberAllDataAwaitLatch2 = new CountDownLatch(1); + for (BinaryRowEx key : keyRows) { + int i = TableRow.keyTuple(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, key)).<Long>value("key").intValue(); - res.subscribe(new Subscriber<>() { - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(10000); + if (i % 2 == 1) { + Tuple rowTuple = TableRow.keyTuple(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, resIter.next())); + + assertEquals(i % 100L, rowTuple.<Long>value("key")); } + } + } - @Override - public void onNext(BinaryRow item) { - retrievedItems.add(item); + @Test + public void deleteAllExactOrderTest() { + List<BinaryRowEx> rowsToLookup = populateEvenKeysAndPrepareEntriesToLookup(false); + + InternalTable internalTable = ((TableImpl) table).internalTable(); + SchemaDescriptor schemaDescriptor = ((TableImpl) table).schemaView().schema(); + + CompletableFuture<List<BinaryRow>> deleteAllExactFut = internalTable.deleteAllExact(rowsToLookup, null); + + assertThat(deleteAllExactFut, willCompleteSuccessfully()); + + List<BinaryRow> res = deleteAllExactFut.join(); + + Iterator<BinaryRow> resIter = res.iterator(); + + for (BinaryRowEx key : rowsToLookup) { + int i = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, key)).<Long>value("key").intValue(); + + if (i % 2 == 1) { + Tuple rowTuple = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resIter.next())); + + assertEquals(i % 100L, rowTuple.<Long>value("key")); + assertEquals(i, rowTuple.<Integer>value("valInt")); + assertEquals("some string row" + i, rowTuple.<Integer>value("valStr")); } + } + } - @Override - public void onError(Throwable throwable) { - fail("onError call is not expected."); + @Test + public void insertAllOrderTest() { + List<BinaryRowEx> rowsToLookup = populateEvenKeysAndPrepareEntriesToLookup(false); + + InternalTable internalTable = ((TableImpl) table).internalTable(); + SchemaDescriptor schemaDescriptor = ((TableImpl) table).schemaView().schema(); + + CompletableFuture<List<BinaryRow>> insertAllFut = internalTable.insertAll(rowsToLookup, null); + + assertThat(insertAllFut, willCompleteSuccessfully()); + + List<BinaryRow> res = insertAllFut.join(); + + Iterator<BinaryRow> resIter = res.iterator(); + + for (BinaryRowEx key : rowsToLookup) { + int i = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, key)).<Long>value("key").intValue(); + + if (i % 2 == 0) { + Tuple rowTuple = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resIter.next())); + + assertEquals(i % 100L, rowTuple.<Long>value("key")); + assertEquals(i, rowTuple.<Integer>value("valInt")); + assertEquals("some string row" + i, rowTuple.<Integer>value("valStr")); } + } + } - @Override - public void onComplete() { - subscriberAllDataAwaitLatch2.countDown(); + private ArrayList<BinaryRowEx> populateEvenKeysAndPrepareEntriesToLookup(boolean keyOnly) { + KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView(); + + var keyRows = new ArrayList<BinaryRowEx>(); + + for (int i = 0; i < 15; i++) { + keyRows.add(keyOnly ? createKeyRow(i) : createKeyValueRow(i, i, "some string row" + i)); + + if (i % 2 == 0) { + putValue(keyValueView, i); } - }); + } + + Collections.shuffle(keyRows); - subscriberAllDataAwaitLatch2.await(); + return keyRows; + } + + private void roScanAll(boolean implicit) throws InterruptedException { + IgniteImpl node = node(); + + KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView(); + + List<BinaryRow> retrievedItems = scanAllPartitions(node); + + assertEquals(0, retrievedItems.size()); + + populateData(node, keyValueView, implicit); + + retrievedItems = scanAllPartitions(node); assertEquals(15, retrievedItems.size()); } + + /** + * Scans all table entries. + * + * @param node Ignite instance. + * @return Collection with all rows. + * @throws InterruptedException If fail. + */ + private static List<BinaryRow> scanAllPartitions(IgniteImpl node) throws InterruptedException { + InternalTable internalTable = ((TableImpl) node.tables().table(TABLE_NAME)).internalTable(); + + List<BinaryRow> retrievedItems = new CopyOnWriteArrayList<>(); + + int parts = internalTable.partitions(); + + var subscriberAllDataAwaitLatch = new CountDownLatch(parts); + + for (int i = 0; i < parts; i++) { + Publisher<BinaryRow> res = internalTable.scan(i, node.clock().now(), node.node()); + + res.subscribe(new Subscriber<>() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(10000); + } + + @Override + public void onNext(BinaryRow item) { + retrievedItems.add(item); + } + + @Override + public void onError(Throwable throwable) { + fail("onError call is not expected."); + } + + @Override + public void onComplete() { + subscriberAllDataAwaitLatch.countDown(); + } + }); + } + + assertTrue(subscriberAllDataAwaitLatch.await(10, TimeUnit.SECONDS)); + + return retrievedItems; + } + private static Row createKeyValueRow(long id, int value, String str) { RowAssembler rowBuilder = new RowAssembler(SCHEMA_1, -1); @@ -506,7 +604,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest { String zoneName = zoneNameForTable(tableName); try (Session session = node.sql().createSession()) { - session.execute(null, String.format("create zone \"%s\" with partitions=1, replicas=%d", zoneName, DEFAULT_REPLICA_COUNT)); + session.execute(null, String.format("create zone \"%s\" with partitions=3, replicas=%d", zoneName, DEFAULT_REPLICA_COUNT)); session.execute(null, String.format( diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java index 44d54ab469..d1b92dbe4f 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.sql.engine.exec; import static org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes; +import static org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectRejectedRowsResponsesWithRestoreOrder; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import static org.apache.ignite.lang.ErrorGroups.Sql.CONSTRAINT_VIOLATION_ERR; @@ -54,6 +55,7 @@ import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMess import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; +import org.apache.ignite.internal.table.distributed.storage.RowBatch; import org.apache.ignite.internal.type.NativeTypeSpec; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.sql.SqlException; @@ -251,34 +253,23 @@ public final class UpdatableTableImpl implements UpdatableTable { RowHandler<RowT> handler = ectx.rowHandler(); - Int2ObjectOpenHashMap<List<BinaryRow>> rowsByPartition = new Int2ObjectOpenHashMap<>(); + Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(handler, rows); - for (RowT row : rows) { - BinaryRowEx binaryRow = convertRow(row, ectx.rowHandler()); + for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) { + int partitionId = partitionRowBatch.getIntKey(); + RowBatch rowBatch = partitionRowBatch.getValue(); - rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new ArrayList<>()).add(binaryRow); - } - - CompletableFuture<List<RowT>>[] futures = new CompletableFuture[rowsByPartition.size()]; - - int batchNum = 0; - - IgniteTypeFactory typeFactory = ectx.getTypeFactory(); - RowSchema rowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(desc.insertRowType(typeFactory))); - RowHandler.RowFactory<RowT> rowFactory = handler.factory(rowSchema); - - for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows : rowsByPartition.int2ObjectEntrySet()) { - TablePartitionId partGroupId = new TablePartitionId(tableId, partToRows.getIntKey()); + TablePartitionId partGroupId = new TablePartitionId(tableId, partitionId); ColocationGroup group = ectx.group(MODIFY_NODE_SOURCE_ID); assert group != null; - NodeWithTerm nodeWithTerm = group.assignments().get(partToRows.getIntKey()); + NodeWithTerm nodeWithTerm = group.assignments().get(partitionId); ReadWriteMultiRowReplicaRequest request = MESSAGES_FACTORY.readWriteMultiRowReplicaRequest() .groupId(partGroupId) .commitPartitionId(serializeTablePartitionId(commitPartitionId)) - .binaryRowMessages(serializeBinaryRows(partToRows.getValue())) + .binaryRowMessages(serializeBinaryRows(rowBatch.requestedRows)) .transactionId(txAttributes.id()) .term(nodeWithTerm.term()) .requestType(RequestType.RW_INSERT_ALL) @@ -286,25 +277,30 @@ public final class UpdatableTableImpl implements UpdatableTable { .skipDelayedAck(true) .build(); - futures[batchNum++] = replicaService.invoke(nodeWithTerm.name(), request) - .thenApply(result -> { - Collection<BinaryRow> binaryRows = (Collection<BinaryRow>) result; + rowBatch.resultFuture = replicaService.invoke(nodeWithTerm.name(), request); + } - if (binaryRows.isEmpty()) { - return List.of(); - } + return handleInsertResults(ectx, rowBatchByPartitionId.values()); + } - List<RowT> conflictRows = new ArrayList<>(binaryRows.size()); + /** + * Creates batches of rows for processing, grouped by partition ID. + * + * @param rowHandler Row handler. + * @param rows Rows. + */ + private <T> Int2ObjectMap<RowBatch> toRowBatchByPartitionId(RowHandler<T> rowHandler, List<T> rows) { + Int2ObjectMap<RowBatch> rowBatchByPartitionId = new Int2ObjectOpenHashMap<>(); - for (BinaryRow row : binaryRows) { - conflictRows.add(rowConverter.toRow(ectx, row, rowFactory)); - } + int i = 0; - return conflictRows; - }); + for (T row : rows) { + BinaryRowEx binaryRow = convertRow(row, rowHandler); + + rowBatchByPartitionId.computeIfAbsent(partitionExtractor.fromRow(binaryRow), partitionId -> new RowBatch()).add(binaryRow, i++); } - return handleInsertResults(handler, futures); + return rowBatchByPartitionId; } /** {@inheritDoc} */ @@ -387,35 +383,28 @@ public final class UpdatableTableImpl implements UpdatableTable { rowAssembler.appendValue(val); } - private static <RowT> CompletableFuture<List<RowT>> handleInsertResults( - RowHandler<RowT> handler, - CompletableFuture<List<RowT>>[] futs + private <RowT> CompletableFuture<List<RowT>> handleInsertResults( + ExecutionContext<RowT> ectx, + Collection<RowBatch> batches ) { - return CompletableFuture.allOf(futs) + return collectRejectedRowsResponsesWithRestoreOrder(batches) .thenApply(response -> { - List<String> conflictRows = null; - - for (CompletableFuture<List<RowT>> future : futs) { - List<RowT> values = future.join(); - - if (nullOrEmpty(values)) { - continue; - } + if (nullOrEmpty(response)) { + return null; + } - if (conflictRows == null) { - conflictRows = new ArrayList<>(values.size()); - } + RowHandler<RowT> handler = ectx.rowHandler(); + IgniteTypeFactory typeFactory = ectx.getTypeFactory(); + RowSchema rowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(desc.insertRowType(typeFactory))); + RowHandler.RowFactory<RowT> rowFactory = handler.factory(rowSchema); - for (RowT row : values) { - conflictRows.add(handler.toString(row)); - } - } + ArrayList<String> conflictRows = new ArrayList<>(response.size()); - if (conflictRows != null) { - throw conflictKeysException(conflictRows); + for (BinaryRow row : response) { + conflictRows.add(handler.toString(rowConverter.toRow(ectx, row, rowFactory))); } - return null; + throw conflictKeysException(conflictRows); }); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index 4d870688f4..e7afd3ee1d 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.NullBinaryRow; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException; @@ -183,7 +184,9 @@ public class ItColocationTest extends BaseIgniteAbstractTest { }); if (cmd instanceof UpdateAllCommand) { - return completedFuture(List.of()); + return completedFuture(((UpdateAllCommand) cmd).rowsToUpdate().keySet().stream() + .map(uuid -> new NullBinaryRow()) + .collect(Collectors.toList())); } else { return completedFuture(true); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index 9c2e2d4fe9..1b588578da 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -178,10 +178,12 @@ public interface InternalTable extends ManuallyCloseable { * Asynchronously insert rows into the table which do not exist, skipping existed ones. * * @param rows Rows to insert into the table. - * @param tx The transaction. - * @return Future representing pending completion of the operation. + * @param tx The transaction. + * @return Future represents the pending completion of the operation, with rejected rows for insertion in the result. The order of + * collection elements is guaranteed to be the same as the order of {@code rows}. If a record is inserted, the element will be + * excluded from the collection result. */ - CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx); + CompletableFuture<List<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx); /** * Asynchronously replaces an existed row associated with the same key columns values as the given one has. @@ -243,19 +245,23 @@ public interface InternalTable extends ManuallyCloseable { * Asynchronously remove rows with the same key columns values as the given one has from the table. * * @param rows Rows with key columns set. - * @param tx The transaction. - * @return Future representing pending completion of the operation. + * @param tx The transaction. + * @return Future represents the pending completion of the operation, with rejected rows for deletion in the result. The order of + * collection elements is guaranteed to be the same as the order of {@code rows}. If a record is deleted, the element will be + * excluded from the collection result. */ - CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx); + CompletableFuture<List<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx); /** * Asynchronously remove given rows from the table. * * @param rows Rows to delete. - * @param tx The transaction. - * @return Future representing pending completion of the operation. + * @param tx The transaction. + * @return Future represents the pending completion of the operation, with rejected rows for deletion in the result. The order of + * collection elements is guaranteed to be the same as the order of {@code rows}. If a record is deleted, the element will be + * excluded from the collection result. */ - CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx); + CompletableFuture<List<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx); /** * Returns a partition for a key. diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java index 6920f5fe92..7284c12a2b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java @@ -176,13 +176,13 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie /** {@inheritDoc} */ @Override - public Collection<Tuple> insertAll(@Nullable Transaction tx, Collection<Tuple> recs) { + public List<Tuple> insertAll(@Nullable Transaction tx, Collection<Tuple> recs) { return sync(insertAllAsync(tx, recs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<Tuple>> insertAllAsync(@Nullable Transaction tx, Collection<Tuple> recs) { + public CompletableFuture<List<Tuple>> insertAllAsync(@Nullable Transaction tx, Collection<Tuple> recs) { Objects.requireNonNull(recs); return withSchemaSync(tx, (schemaVersion) -> { @@ -303,13 +303,13 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie /** {@inheritDoc} */ @Override - public Collection<Tuple> deleteAll(@Nullable Transaction tx, Collection<Tuple> keyRecs) { + public List<Tuple> deleteAll(@Nullable Transaction tx, Collection<Tuple> keyRecs) { return sync(deleteAllAsync(tx, keyRecs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<Tuple>> deleteAllAsync(@Nullable Transaction tx, Collection<Tuple> keyRecs) { + public CompletableFuture<List<Tuple>> deleteAllAsync(@Nullable Transaction tx, Collection<Tuple> keyRecs) { Objects.requireNonNull(keyRecs); return withSchemaSync(tx, (schemaVersion) -> { @@ -320,13 +320,13 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie /** {@inheritDoc} */ @Override - public Collection<Tuple> deleteAllExact(@Nullable Transaction tx, Collection<Tuple> recs) { + public List<Tuple> deleteAllExact(@Nullable Transaction tx, Collection<Tuple> recs) { return sync(deleteAllExactAsync(tx, recs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@Nullable Transaction tx, Collection<Tuple> recs) { + public CompletableFuture<List<Tuple>> deleteAllExactAsync(@Nullable Transaction tx, Collection<Tuple> recs) { Objects.requireNonNull(recs); return withSchemaSync(tx, (schemaVersion) -> { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java index 27af10db67..4e680c6b65 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java @@ -171,13 +171,13 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R /** {@inheritDoc} */ @Override - public Collection<R> insertAll(@Nullable Transaction tx, Collection<R> recs) { + public List<R> insertAll(@Nullable Transaction tx, Collection<R> recs) { return sync(insertAllAsync(tx, recs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<R>> insertAllAsync(@Nullable Transaction tx, Collection<R> recs) { + public CompletableFuture<List<R>> insertAllAsync(@Nullable Transaction tx, Collection<R> recs) { Objects.requireNonNull(recs); return withSchemaSync(tx, (schemaVersion) -> { @@ -299,13 +299,13 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R /** {@inheritDoc} */ @Override - public Collection<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs) { + public List<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs) { return sync(deleteAllAsync(tx, keyRecs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<R>> deleteAllAsync(@Nullable Transaction tx, Collection<R> keyRecs) { + public CompletableFuture<List<R>> deleteAllAsync(@Nullable Transaction tx, Collection<R> keyRecs) { Objects.requireNonNull(keyRecs); return withSchemaSync(tx, (schemaVersion) -> { @@ -317,13 +317,13 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R /** {@inheritDoc} */ @Override - public Collection<R> deleteAllExact(@Nullable Transaction tx, Collection<R> recs) { + public List<R> deleteAllExact(@Nullable Transaction tx, Collection<R> recs) { return sync(deleteAllExactAsync(tx, recs)); } /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<R>> deleteAllExactAsync(@Nullable Transaction tx, Collection<R> keyRecs) { + public CompletableFuture<List<R>> deleteAllExactAsync(@Nullable Transaction tx, Collection<R> keyRecs) { Objects.requireNonNull(keyRecs); return withSchemaSync(tx, (schemaVersion) -> { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 549499fd3e..3a58e43c80 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -1772,6 +1772,7 @@ public class PartitionReplicaListener implements ReplicaListener { return allOf(deleteExactLockFuts).thenCompose(ignore -> { Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>(); + // TODO:IGNITE-20669 Replace the result to BitSet. Collection<BinaryRow> result = new ArrayList<>(); for (int i = 0; i < searchRows.size(); i++) { @@ -1779,8 +1780,10 @@ public class PartitionReplicaListener implements ReplicaListener { if (lockedRowId != null) { rowIdsToDelete.put(lockedRowId.uuid(), null); + + result.add(new NullBinaryRow()); } else { - result.add(searchRows.get(i)); + result.add(null); } } @@ -1815,6 +1818,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOf(pkReadLockFuts).thenCompose(ignore -> { + // TODO:IGNITE-20669 Replace the result to BitSet. Collection<BinaryRow> result = new ArrayList<>(); Map<RowId, BinaryRow> rowsToInsert = new HashMap<>(); Set<ByteBuffer> uniqueKeys = new HashSet<>(); @@ -1825,8 +1829,10 @@ public class PartitionReplicaListener implements ReplicaListener { if (lockedRow == null && uniqueKeys.add(pks.get(i).byteBuffer())) { rowsToInsert.put(new RowId(partId(), UUID.randomUUID()), row); + + result.add(new NullBinaryRow()); } else { - result.add(row); + result.add(null); } } @@ -2007,6 +2013,7 @@ public class PartitionReplicaListener implements ReplicaListener { return allOf(rowIdLockFuts).thenCompose(ignore -> { Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>(); + // TODO:IGNITE-20669 Replace the result to BitSet. Collection<BinaryRow> result = new ArrayList<>(); for (CompletableFuture<RowId> lockFut : rowIdLockFuts) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index a5f84c2a2f..cea79a364e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -38,7 +38,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.net.ConnectException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Collections; @@ -94,7 +93,6 @@ import org.apache.ignite.internal.table.distributed.replication.request.Multiple import org.apache.ignite.internal.table.distributed.replication.request.MultipleRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest; -import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder; import org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest; @@ -475,7 +473,7 @@ public class InternalTableImpl implements InternalTable { if (e2 != null) { return result.completeExceptionally(e2); } else { - return result.complete((R) r2); + return result.complete(r2); } }); } @@ -992,7 +990,7 @@ public class InternalTableImpl implements InternalTable { /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, InternalTransaction tx) { + public CompletableFuture<List<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, InternalTransaction tx) { return enlistInTx( rows, tx, @@ -1006,11 +1004,16 @@ public class InternalTableImpl implements InternalTable { .timestampLong(clock.nowLong()) .full(full) .build(), - InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder, + InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder, (res, req) -> { - ReadWriteMultiRowReplicaRequest r = (ReadWriteMultiRowReplicaRequest) req; + for (BinaryRow row : res) { + if (row != null) { + return false; + } + } - return res.size() == r.binaryRowMessages().size(); + // All values are null, this means nothing was deleted. + return true; } ); } @@ -1138,7 +1141,7 @@ public class InternalTableImpl implements InternalTable { /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, InternalTransaction tx) { + public CompletableFuture<List<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, InternalTransaction tx) { return enlistInTx( rows, tx, @@ -1152,24 +1155,7 @@ public class InternalTableImpl implements InternalTable { .timestampLong(clock.nowLong()) .full(full) .build(), - rowBatches -> allResultFutures(rowBatches).thenApply(v -> { - List<BinaryRow> result = new ArrayList<>(); - - for (RowBatch batch : rowBatches) { - List<BinaryRow> requestedRows = batch.requestedRows; - List<BinaryRow> response = (List<BinaryRow>) batch.resultFuture.join(); - - assert requestedRows.size() == response.size(); - - for (int i = 0; i < requestedRows.size(); i++) { - if (response.get(i) == null) { - result.add(requestedRows.get(i)); - } - } - } - - return result; - }), + InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder, (res, req) -> { for (BinaryRow row : res) { if (row != null) { @@ -1185,7 +1171,7 @@ public class InternalTableImpl implements InternalTable { /** {@inheritDoc} */ @Override - public CompletableFuture<Collection<BinaryRow>> deleteAllExact( + public CompletableFuture<List<BinaryRow>> deleteAllExact( Collection<BinaryRowEx> rows, InternalTransaction tx ) { @@ -1202,11 +1188,16 @@ public class InternalTableImpl implements InternalTable { .timestampLong(clock.nowLong()) .full(full) .build(), - InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder, + InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder, (res, req) -> { - ReadWriteMultiRowReplicaRequest r = (ReadWriteMultiRowReplicaRequest) req; + for (BinaryRow row : res) { + if (row != null) { + return false; + } + } - return res.size() == r.binaryRowMessages().size(); + // All values are null, this means nothing was deleted. + return true; } ); } @@ -1554,44 +1545,66 @@ public class InternalTableImpl implements InternalTable { } /** - * Gathers the result of batch processing into a single resulting collection of rows. + * Gets a batch result. * * @param rowBatches Row batches. * @return Future of collecting results. */ - static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(Collection<RowBatch> rowBatches) { - return allResultFutures(rowBatches) - .thenApply(response -> { - var result = new ArrayList<BinaryRow>(rowBatches.size()); - - for (RowBatch rowBatch : rowBatches) { - Collection<BinaryRow> batchResult = (Collection<BinaryRow>) rowBatch.getCompletedResult(); + public static CompletableFuture<List<BinaryRow>> collectRejectedRowsResponsesWithRestoreOrder(Collection<RowBatch> rowBatches) { + return collectMultiRowsResponsesWithRestoreOrder( + rowBatches, + batch -> { + List<BinaryRow> result = new ArrayList<>(); + List<BinaryRow> response = (List<BinaryRow>) batch.getCompletedResult(); - if (batchResult == null) { - continue; - } + assert batch.requestedRows.size() == response.size() : + "Replication response does not fit to request [requestRows=" + batch.requestedRows.size() + + "responseRows=" + response.size() + ']'; - result.addAll(batchResult); + for (int i = 0; i < response.size(); i++) { + result.add(response.get(i) != null ? null : batch.requestedRows.get(i)); } return result; - }); + }, + true + ); + } + + /** + * Gets a batch result. + * + * @param rowBatches Row batches. + * @return Future of collecting results. + */ + static CompletableFuture<List<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(Collection<RowBatch> rowBatches) { + return collectMultiRowsResponsesWithRestoreOrder( + rowBatches, + batch -> (Collection<BinaryRow>) batch.getCompletedResult(), + false + ); } /** * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of * rows. * - * @param rowBatches Row batches by partition ID. + * @param rowBatches Row batches. + * @param bathResultMapper Map a batch to the result collection of binary rows. + * @param skipNull True to skip the null in result collection, false otherwise. * @return Future of collecting results. */ - static CompletableFuture<List<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(Collection<RowBatch> rowBatches) { + private static CompletableFuture<List<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder( + Collection<RowBatch> rowBatches, + Function<RowBatch, Collection<BinaryRow>> bathResultMapper, + boolean skipNull + ) { return allResultFutures(rowBatches) .thenApply(response -> { var result = new BinaryRow[RowBatch.getTotalRequestedRowSize(rowBatches)]; for (RowBatch rowBatch : rowBatches) { - Collection<BinaryRow> batchResult = (Collection<BinaryRow>) rowBatch.getCompletedResult(); + Collection<BinaryRow> batchResult = bathResultMapper.apply(rowBatch); assert batchResult != null; @@ -1605,8 +1618,15 @@ public class InternalTableImpl implements InternalTable { } } - // Use Arrays#asList to avoid copying the array. - return Arrays.asList(result); + ArrayList<BinaryRow> resultToReturn = new ArrayList<>(); + + for (BinaryRow row : result) { + if (!skipNull || row != null) { + resultToReturn.add(row); + } + } + + return resultToReturn; }); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java index 27b97b19f0..0b2a6feea3 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java @@ -31,9 +31,9 @@ import org.jetbrains.annotations.Nullable; * * <p>NOTE: Not thread-safe. */ -class RowBatch { +public class RowBatch { /** Batch of rows from the original collection of rows. */ - final List<BinaryRow> requestedRows = new ArrayList<>(); + public final List<BinaryRow> requestedRows = new ArrayList<>(); /** Order of the rows from the {@link #requestedRows} in the original row collection. */ final IntList originalRowOrder = new IntArrayList(); @@ -41,9 +41,9 @@ class RowBatch { /** * Future of the result of processing the {@link #requestedRows}, {@code null} if not set and may return {@code null}. */ - @Nullable CompletableFuture<?> resultFuture; + public @Nullable CompletableFuture<?> resultFuture; - void add(BinaryRow row, int originalIndex) { + public void add(BinaryRow row, int originalIndex) { requestedRows.add(row); originalRowOrder.add(originalIndex); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java index 8e3fb9740f..f06e86eede 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java @@ -19,12 +19,11 @@ package org.apache.ignite.internal.table.distributed.storage; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectMultiRowsResponsesWithRestoreOrder; -import static org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectMultiRowsResponsesWithoutRestoreOrder; +import static org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectRejectedRowsResponsesWithRestoreOrder; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; @@ -35,12 +34,14 @@ import static org.mockito.Mockito.when; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; +import java.util.ArrayList; import java.util.List; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.schema.BinaryRowEx; +import org.apache.ignite.internal.schema.NullBinaryRow; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.tx.HybridTimestampTracker; @@ -151,9 +152,39 @@ public class InternalTableImplTest extends BaseIgniteAbstractTest { willBe(equalTo(originalRows)) ); + var part1 = new ArrayList<>(2); + + part1.add(null); + part1.add(new NullBinaryRow()); + + rowBatchByPartitionId.get(0).resultFuture = completedFuture(part1); + + var part2 = new ArrayList<>(2); + + part2.add(null); + + rowBatchByPartitionId.get(1).resultFuture = completedFuture(part2); + + var part3 = new ArrayList<>(2); + + part3.add(new NullBinaryRow()); + part3.add(null); + part3.add(new NullBinaryRow()); + + rowBatchByPartitionId.get(2).resultFuture = completedFuture(part3); + + List<BinaryRowEx> rejectedRows = List.of( + // Rows for 0 partition. + originalRows.get(0), + // Rows for 1 partition. + originalRows.get(2), + // Rows for 2 partition. + originalRows.get(4) + ); + assertThat( - collectMultiRowsResponsesWithoutRestoreOrder(rowBatchByPartitionId.values()), - willBe(hasItems(originalRows.toArray(BinaryRowEx[]::new))) + collectRejectedRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values()), + willBe(equalTo(rejectedRows)) ); } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index abd834852b..c3d191a568 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -218,6 +218,7 @@ public class DummyInternalTableImpl extends InternalTableImpl { * ReplicaService by itself. * @param transactionStateResolver Transaction state resolver. * @param schema Schema descriptor. + * @param tracker Observable timestamp tracker. */ public DummyInternalTableImpl( ReplicaService replicaSvc, @@ -242,7 +243,7 @@ public class DummyInternalTableImpl extends InternalTableImpl { tracker, TEST_PLACEMENT_DRIVER ); - RaftGroupService svc = raftGroupServiceByPartitionId.get(0); + RaftGroupService svc = raftGroupServiceByPartitionId.get(PART_ID); groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) : crossTableGroupId;