This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c60f052bd0 IGNITE-20435 Preserve key order in batch opperations 
(deleteAll, deleteAllExact, insertAll) (#2664)
c60f052bd0 is described below

commit c60f052bd074cad30c753cc1903018a45362a34b
Author: Vladislav Pyatkov <vldpyat...@gmail.com>
AuthorDate: Tue Oct 17 12:08:21 2023 +0300

    IGNITE-20435 Preserve key order in batch opperations (deleteAll, 
deleteAllExact, insertAll) (#2664)
---
 .../java/org/apache/ignite/table/RecordView.java   |  45 ++--
 .../client/table/ClientRecordBinaryView.java       |  14 +-
 .../internal/client/table/ClientRecordView.java    |  12 +-
 .../client/table/ClientTupleSerializer.java        |   4 +-
 .../ignite/client/fakes/FakeInternalTable.java     |  33 +--
 .../client-test/key_value_binary_view_test.cpp     |  19 +-
 .../cpp/tests/client-test/key_value_view_test.cpp  |  14 +-
 .../tests/client-test/record_binary_view_test.cpp  |  19 +-
 .../cpp/tests/client-test/record_view_test.cpp     |  14 +-
 ...ItRoReadsTest.java => ItInternalTableTest.java} | 262 ++++++++++++++-------
 .../sql/engine/exec/UpdatableTableImpl.java        |  95 ++++----
 .../ignite/internal/table/ItColocationTest.java    |   5 +-
 .../ignite/internal/table/InternalTable.java       |  24 +-
 .../internal/table/RecordBinaryViewImpl.java       |  12 +-
 .../ignite/internal/table/RecordViewImpl.java      |  12 +-
 .../replicator/PartitionReplicaListener.java       |  11 +-
 .../distributed/storage/InternalTableImpl.java     | 116 +++++----
 .../table/distributed/storage/RowBatch.java        |   8 +-
 .../distributed/storage/InternalTableImplTest.java |  39 ++-
 .../table/impl/DummyInternalTableImpl.java         |   3 +-
 20 files changed, 454 insertions(+), 307 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/table/RecordView.java 
b/modules/api/src/main/java/org/apache/ignite/table/RecordView.java
index e80a9da595..e95fc98b2e 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/RecordView.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/RecordView.java
@@ -143,20 +143,23 @@ public interface RecordView<R> extends 
DataStreamerTarget<R> {
     /**
      * Inserts into a table records that do not exist, skips those that exist.
      *
-     * @param tx   Transaction or {@code null} to auto-commit.
+     * @param tx Transaction or {@code null} to auto-commit.
      * @param recs Records to insert into the table. The records cannot be 
{@code null}.
-     * @return Skipped records.
+     * @return Skipped records. The order of collection elements is guaranteed 
to be the same as the order of {@code recs}. If a record is
+     *         inserted, the element will be excluded from the collection 
result.
      */
-    Collection<R> insertAll(@Nullable Transaction tx, Collection<R> recs);
+    List<R> insertAll(@Nullable Transaction tx, Collection<R> recs);
 
     /**
      * Asynchronously inserts into a table records that do not exist, skips 
those that exist.
      *
-     * @param tx   Transaction or {@code null} to auto-commit.
+     * @param tx Transaction or {@code null} to auto-commit.
      * @param recs Records to insert into the table. The records cannot be 
{@code null}.
-     * @return Future representing pending completion of the operation.
+     * @return Future representing pending completion of the operation, with 
rejected rows for insertion in the result. The order of
+     *         collection elements is guaranteed to be the same as the order 
of {@code recs}. If a record is inserted, the element will be
+     *         excluded from the collection result.
      */
-    CompletableFuture<Collection<R>> insertAllAsync(@Nullable Transaction tx, 
Collection<R> recs);
+    CompletableFuture<List<R>> insertAllAsync(@Nullable Transaction tx, 
Collection<R> recs);
 
     /**
      * Replaces an existing record associated with the same key column values 
as the given record.
@@ -272,36 +275,42 @@ public interface RecordView<R> extends 
DataStreamerTarget<R> {
     /**
      * Removes from a table records with the same key column values as the 
given one.
      *
-     * @param tx      Transaction or {@code null} to auto-commit.
+     * @param tx Transaction or {@code null} to auto-commit.
      * @param keyRecs Records with the key columns set. The records cannot be 
{@code null}.
-     * @return Records with the key columns set that did not exist.
+     * @return Records with the key columns set that did not exist. The order 
of collection elements is guaranteed to be the same as the
+     *         order of {@code keyRecs}. If a record is removed, the element 
will be excluded from the collection result.
      */
-    Collection<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs);
+    List<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs);
 
     /**
      * Asynchronously removes from a table records with the same key column 
values as the given one.
      *
-     * @param tx      Transaction or {@code null} to auto-commit.
+     * @param tx Transaction or {@code null} to auto-commit.
      * @param keyRecs Records with the key columns set. The records cannot be 
{@code null}.
-     * @return Future that represents the pending completion of the operation.
+     * @return Future represents the pending completion of the operation, with 
rejected rows for deletion in the result. The order of
+     *         collection elements is guaranteed to be the same as the order 
of {@code keyRecs}. If a record is removed, the element will be
+     *         excluded from the collection result.
      */
-    CompletableFuture<Collection<R>> deleteAllAsync(@Nullable Transaction tx, 
Collection<R> keyRecs);
+    CompletableFuture<List<R>> deleteAllAsync(@Nullable Transaction tx, 
Collection<R> keyRecs);
 
     /**
      * Remove the given records from a table.
      *
-     * @param tx   Transaction or {@code null} to auto-commit.
+     * @param tx Transaction or {@code null} to auto-commit.
      * @param recs Records to delete. The records cannot be {@code null}.
-     * @return Records that were not deleted.
+     * @return Records that were not deleted. The order of collection elements 
is guaranteed to be the same as the order of {@code recs}. If
+     *         a record is removed, the element will be excluded from the 
collection result.
      */
-    Collection<R> deleteAllExact(@Nullable Transaction tx, Collection<R> recs);
+    List<R> deleteAllExact(@Nullable Transaction tx, Collection<R> recs);
 
     /**
      * Asynchronously removes the given records from a table.
      *
-     * @param tx   Transaction or {@code null} to auto-commit.
+     * @param tx Transaction or {@code null} to auto-commit.
      * @param recs Records to delete. The records cannot be {@code null}.
-     * @return Future that represents the pending completion of the operation.
+     * @return Future represents the pending completion of the operation, with 
rejected rows for deletion in the result. The order of
+     *         collection elements is guaranteed to be the same as the order 
of {@code recs}. If a record is removed, the element will be
+     *         excluded from the collection result.
      */
-    CompletableFuture<Collection<R>> deleteAllExactAsync(@Nullable Transaction 
tx, Collection<R> recs);
+    CompletableFuture<List<R>> deleteAllExactAsync(@Nullable Transaction tx, 
Collection<R> recs);
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
index 3610d15c06..5c7d17c6ec 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
@@ -178,17 +178,17 @@ public class ClientRecordBinaryView implements 
RecordView<Tuple> {
 
     /** {@inheritDoc} */
     @Override
-    public Collection<Tuple> insertAll(@Nullable Transaction tx, 
Collection<Tuple> recs) {
+    public List<Tuple> insertAll(@Nullable Transaction tx, Collection<Tuple> 
recs) {
         return sync(insertAllAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<Tuple>> insertAllAsync(@Nullable 
Transaction tx, Collection<Tuple> recs) {
+    public CompletableFuture<List<Tuple>> insertAllAsync(@Nullable Transaction 
tx, Collection<Tuple> recs) {
         Objects.requireNonNull(recs);
 
         if (recs.isEmpty()) {
-            return CompletableFuture.completedFuture(Collections.emptyList());
+            return completedFuture(Collections.emptyList());
         }
 
         return tbl.doSchemaOutInOpAsync(
@@ -315,13 +315,13 @@ public class ClientRecordBinaryView implements 
RecordView<Tuple> {
 
     /** {@inheritDoc} */
     @Override
-    public Collection<Tuple> deleteAll(@Nullable Transaction tx, 
Collection<Tuple> keyRecs) {
+    public List<Tuple> deleteAll(@Nullable Transaction tx, Collection<Tuple> 
keyRecs) {
         return sync(deleteAllAsync(tx, keyRecs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<Tuple>> deleteAllAsync(@Nullable 
Transaction tx, Collection<Tuple> keyRecs) {
+    public CompletableFuture<List<Tuple>> deleteAllAsync(@Nullable Transaction 
tx, Collection<Tuple> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
         if (keyRecs.isEmpty()) {
@@ -338,13 +338,13 @@ public class ClientRecordBinaryView implements 
RecordView<Tuple> {
 
     /** {@inheritDoc} */
     @Override
-    public Collection<Tuple> deleteAllExact(@Nullable Transaction tx, 
Collection<Tuple> recs) {
+    public List<Tuple> deleteAllExact(@Nullable Transaction tx, 
Collection<Tuple> recs) {
         return sync(deleteAllExactAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<Tuple> recs) {
+    public CompletableFuture<List<Tuple>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<Tuple> recs) {
         Objects.requireNonNull(recs);
 
         if (recs.isEmpty()) {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
index 0b3f514a6d..33745d7fbc 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
@@ -178,13 +178,13 @@ public class ClientRecordView<R> implements RecordView<R> 
{
 
     /** {@inheritDoc} */
     @Override
-    public Collection<R> insertAll(@Nullable Transaction tx, Collection<R> 
recs) {
+    public List<R> insertAll(@Nullable Transaction tx, Collection<R> recs) {
         return sync(insertAllAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<R>> insertAllAsync(@Nullable 
Transaction tx, Collection<R> recs) {
+    public CompletableFuture<List<R>> insertAllAsync(@Nullable Transaction tx, 
Collection<R> recs) {
         Objects.requireNonNull(recs);
 
         if (recs.isEmpty()) {
@@ -312,13 +312,13 @@ public class ClientRecordView<R> implements RecordView<R> 
{
 
     /** {@inheritDoc} */
     @Override
-    public Collection<R> deleteAll(@Nullable Transaction tx, Collection<R> 
keyRecs) {
+    public List<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs) {
         return sync(deleteAllAsync(tx, keyRecs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<R>> deleteAllAsync(@Nullable 
Transaction tx, Collection<R> keyRecs) {
+    public CompletableFuture<List<R>> deleteAllAsync(@Nullable Transaction tx, 
Collection<R> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
         if (keyRecs.isEmpty()) {
@@ -335,13 +335,13 @@ public class ClientRecordView<R> implements RecordView<R> 
{
 
     /** {@inheritDoc} */
     @Override
-    public Collection<R> deleteAllExact(@Nullable Transaction tx, 
Collection<R> recs) {
+    public List<R> deleteAllExact(@Nullable Transaction tx, Collection<R> 
recs) {
         return sync(deleteAllExactAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<R>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<R> recs) {
+    public CompletableFuture<List<R>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<R> recs) {
         Objects.requireNonNull(recs);
 
         if (recs.isEmpty()) {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
index 63f6901ff5..e5a7d683b9 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
@@ -312,11 +312,11 @@ public class ClientTupleSerializer {
         return res;
     }
 
-    static Collection<Tuple> readTuples(ClientSchema schema, 
ClientMessageUnpacker in) {
+    static List<Tuple> readTuples(ClientSchema schema, ClientMessageUnpacker 
in) {
         return readTuples(schema, in, false);
     }
 
-    static Collection<Tuple> readTuples(ClientSchema schema, 
ClientMessageUnpacker in, boolean keyOnly) {
+    static List<Tuple> readTuples(ClientSchema schema, ClientMessageUnpacker 
in, boolean keyOnly) {
         var cnt = in.unpackInt();
         var res = new ArrayList<Tuple>(cnt);
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 44aef27200..02e382a279 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -79,13 +79,11 @@ public class FakeInternalTable implements InternalTable {
         this.keyExtractor = keyExtractor;
     }
 
-    /** {@inheritDoc} */
     @Override
     public MvTableStorage storage() {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
-    /** {@inheritDoc} */
     @Override
     public int partitions() {
         return 1;
@@ -96,19 +94,16 @@ public class FakeInternalTable implements InternalTable {
         return tableId;
     }
 
-    /** {@inheritDoc} */
     @Override
     public String name() {
         return tableName;
     }
 
-    /** {@inheritDoc} */
     @Override
     public int partitionId(BinaryRowEx row) {
         return 0;
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable 
InternalTransaction tx) {
         return completedFuture(getImpl(keyRow.tupleSlice(), keyRow));
@@ -128,7 +123,6 @@ public class FakeInternalTable implements InternalTable {
         return data.get(key);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> 
keyRows, @Nullable InternalTransaction tx) {
         var res = new ArrayList<BinaryRow>();
@@ -156,7 +150,6 @@ public class FakeInternalTable implements InternalTable {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable 
InternalTransaction tx) {
         upsertImpl(keyExtractor.extractColumns(row), row);
@@ -170,7 +163,6 @@ public class FakeInternalTable implements InternalTable {
         data.put(key.byteBuffer(), row);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, 
@Nullable InternalTransaction tx) {
         for (var row : rows) {
@@ -181,13 +173,11 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(null);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int 
partition) {
         throw new UnsupportedOperationException();
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<BinaryRow> getAndUpsert(BinaryRowEx row,
             @Nullable InternalTransaction tx) {
@@ -202,7 +192,6 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(res);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> insert(BinaryRowEx row, @Nullable 
InternalTransaction tx) {
         BinaryTuple key = keyExtractor.extractColumns(row);
@@ -218,9 +207,8 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(old == null);
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<BinaryRow>> 
insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+    public CompletableFuture<List<BinaryRow>> 
insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
         var skipped = new ArrayList<BinaryRow>();
 
         for (var row : rows) {
@@ -233,7 +221,6 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(skipped);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> replace(BinaryRowEx row, @Nullable 
InternalTransaction tx) {
         BinaryTuple key = keyExtractor.extractColumns(row);
@@ -241,7 +228,6 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(replaceImpl(key, row, tx) != null);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx 
newRow, @Nullable InternalTransaction tx) {
         BinaryTuple key = keyExtractor.extractColumns(oldRow);
@@ -275,7 +261,6 @@ public class FakeInternalTable implements InternalTable {
         return old;
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<BinaryRow> getAndReplace(BinaryRowEx row, 
@Nullable InternalTransaction tx) {
         BinaryTuple key = keyExtractor.extractColumns(row);
@@ -287,7 +272,6 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(replace);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> delete(BinaryRowEx keyRow, @Nullable 
InternalTransaction tx) {
         BinaryRow old = getImpl(keyRow.tupleSlice(), keyRow);
@@ -300,7 +284,6 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(old != null);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> deleteExact(BinaryRowEx oldRow, 
@Nullable InternalTransaction tx) {
         var res = false;
@@ -318,7 +301,6 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(res);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<BinaryRow> getAndDelete(BinaryRowEx row, 
@Nullable InternalTransaction tx) {
         BinaryRow old = getImpl(row.tupleSlice(), row);
@@ -331,9 +313,8 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(old);
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<BinaryRow>> 
deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+    public CompletableFuture<List<BinaryRow>> 
deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
         var skipped = new ArrayList<BinaryRow>();
 
         for (var row : rows) {
@@ -346,9 +327,8 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(skipped);
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<BinaryRow>> 
deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+    public CompletableFuture<List<BinaryRow>> 
deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
         var skipped = new ArrayList<BinaryRow>();
 
         for (var row : rows) {
@@ -434,42 +414,35 @@ public class FakeInternalTable implements InternalTable {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
 
-    /** {@inheritDoc} */
     @Override
     public List<String> assignments() {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<List<PrimaryReplica>> primaryReplicas() {
         return CompletableFuture.failedFuture(new IgniteInternalException(new 
OperationNotSupportedException()));
     }
 
-    /** {@inheritDoc} */
     @Override
     public ClusterNode leaderAssignment(int partition) {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
 
-    /** {@inheritDoc} */
     @Override
     public RaftGroupService partitionRaftGroupService(int partition) {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override public TxStateTableStorage txStateStorage() {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public int partition(BinaryRowEx keyRow) {
         return 0;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() {
         // No-op.
diff --git 
a/modules/platforms/cpp/tests/client-test/key_value_binary_view_test.cpp 
b/modules/platforms/cpp/tests/client-test/key_value_binary_view_test.cpp
index 884d1e2ad2..3073c3db38 100644
--- a/modules/platforms/cpp/tests/client-test/key_value_binary_view_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/key_value_binary_view_test.cpp
@@ -860,12 +860,11 @@ TEST_F(key_value_binary_view_test, 
remove_all_nonexisting_keys_return_all) {
 
     EXPECT_EQ(res.size(), 2);
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
     EXPECT_EQ(1, res[0].column_count());
-    EXPECT_EQ(2, res[0].get<int64_t>("key"));
+    EXPECT_EQ(1, res[0].get<int64_t>("key"));
 
     EXPECT_EQ(1, res[1].column_count());
-    EXPECT_EQ(1, res[1].get<int64_t>("key"));
+    EXPECT_EQ(2, res[1].get<int64_t>("key"));
 }
 
 TEST_F(key_value_binary_view_test, remove_all_only_existing) {
@@ -896,12 +895,11 @@ TEST_F(key_value_binary_view_test, remove_all_overlapped) 
{
 
     EXPECT_EQ(res.size(), 2);
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
     EXPECT_EQ(1, res[0].column_count());
-    EXPECT_EQ(12, res[0].get<int64_t>("key"));
+    EXPECT_EQ(11, res[0].get<int64_t>("key"));
 
     EXPECT_EQ(1, res[1].column_count());
-    EXPECT_EQ(11, res[1].get<int64_t>("key"));
+    EXPECT_EQ(12, res[1].get<int64_t>("key"));
 }
 
 TEST_F(key_value_binary_view_test, remove_all_empty) {
@@ -912,8 +910,15 @@ TEST_F(key_value_binary_view_test, remove_all_empty) {
 TEST_F(key_value_binary_view_test, remove_all_exact_nonexisting) {
     auto res = kv_view.remove_all(nullptr, {{get_tuple(1), get_tuple("foo")}, 
{get_tuple(2), get_tuple("bar")}});
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
     ASSERT_EQ(2, res.size());
+
+    EXPECT_EQ(2, res[0].column_count());
+    EXPECT_EQ(1, res[0].get<int64_t>("key"));
+    EXPECT_EQ("foo", res[0].get<std::string>("val"));
+
+    EXPECT_EQ(2, res[1].column_count());
+    EXPECT_EQ(2, res[1].get<int64_t>("key"));
+    EXPECT_EQ("bar", res[1].get<std::string>("val"));
 }
 
 TEST_F(key_value_binary_view_test, remove_all_exact_overlapped) {
diff --git a/modules/platforms/cpp/tests/client-test/key_value_view_test.cpp 
b/modules/platforms/cpp/tests/client-test/key_value_view_test.cpp
index 98ee0264b0..39b13a8238 100644
--- a/modules/platforms/cpp/tests/client-test/key_value_view_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/key_value_view_test.cpp
@@ -699,9 +699,8 @@ TEST_F(key_value_view_test, 
remove_all_nonexisting_keys_return_all) {
 
     EXPECT_EQ(res.size(), 2);
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
-    EXPECT_EQ(2, res[0].key);
-    EXPECT_EQ(1, res[1].key);
+    EXPECT_EQ(1, res[0].key);
+    EXPECT_EQ(2, res[1].key);
 }
 
 TEST_F(key_value_view_test, remove_all_only_existing) {
@@ -732,9 +731,8 @@ TEST_F(key_value_view_test, remove_all_overlapped) {
 
     EXPECT_EQ(res.size(), 2);
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
-    EXPECT_EQ(12, res[0].key);
-    EXPECT_EQ(11, res[1].key);
+    EXPECT_EQ(11, res[0].key);
+    EXPECT_EQ(12, res[1].key);
 }
 
 TEST_F(key_value_view_test, remove_all_empty) {
@@ -746,8 +744,10 @@ TEST_F(key_value_view_test, remove_all_exact_nonexisting) {
     auto res = kv_view.remove_all(
         nullptr, {{test_key_type(1), test_value_type("foo")}, 
{test_key_type(2), test_value_type("bar")}});
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
     ASSERT_EQ(2, res.size());
+
+    EXPECT_EQ(1, res[0].key);
+    EXPECT_EQ(2, res[1].key);
 }
 
 TEST_F(key_value_view_test, remove_all_exact_overlapped) {
diff --git 
a/modules/platforms/cpp/tests/client-test/record_binary_view_test.cpp 
b/modules/platforms/cpp/tests/client-test/record_binary_view_test.cpp
index dbdb2c4eaf..589072567e 100644
--- a/modules/platforms/cpp/tests/client-test/record_binary_view_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/record_binary_view_test.cpp
@@ -862,12 +862,11 @@ TEST_F(record_binary_view_test, 
remove_all_nonexisting_keys_return_all) {
 
     EXPECT_EQ(res.size(), 2);
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
     EXPECT_EQ(1, res[0].column_count());
-    EXPECT_EQ(2, res[0].get<int64_t>("key"));
+    EXPECT_EQ(1, res[0].get<int64_t>("key"));
 
     EXPECT_EQ(1, res[1].column_count());
-    EXPECT_EQ(1, res[1].get<int64_t>("key"));
+    EXPECT_EQ(2, res[1].get<int64_t>("key"));
 }
 
 TEST_F(record_binary_view_test, remove_all_only_existing) {
@@ -897,12 +896,11 @@ TEST_F(record_binary_view_test, remove_all_overlapped) {
 
     EXPECT_EQ(res.size(), 2);
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
     EXPECT_EQ(1, res[0].column_count());
-    EXPECT_EQ(12, res[0].get<int64_t>("key"));
+    EXPECT_EQ(11, res[0].get<int64_t>("key"));
 
     EXPECT_EQ(1, res[1].column_count());
-    EXPECT_EQ(11, res[1].get<int64_t>("key"));
+    EXPECT_EQ(12, res[1].get<int64_t>("key"));
 }
 
 TEST_F(record_binary_view_test, remove_all_empty) {
@@ -913,8 +911,15 @@ TEST_F(record_binary_view_test, remove_all_empty) {
 TEST_F(record_binary_view_test, remove_all_exact_nonexisting) {
     auto res = tuple_view.remove_all_exact(nullptr, {get_tuple(1, "foo"), 
get_tuple(2, "bar")});
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
     ASSERT_EQ(2, res.size());
+
+    EXPECT_EQ(2, res[0].column_count());
+    EXPECT_EQ(1, res[0].get<int64_t>("key"));
+    EXPECT_EQ("foo", res[0].get<std::string>("val"));
+
+    EXPECT_EQ(2, res[1].column_count());
+    EXPECT_EQ(2, res[1].get<int64_t>("key"));
+    EXPECT_EQ("bar", res[1].get<std::string>("val"));
 }
 
 TEST_F(record_binary_view_test, remove_all_exact_overlapped) {
diff --git a/modules/platforms/cpp/tests/client-test/record_view_test.cpp 
b/modules/platforms/cpp/tests/client-test/record_view_test.cpp
index 232c9df42c..d4d3318fa9 100644
--- a/modules/platforms/cpp/tests/client-test/record_view_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/record_view_test.cpp
@@ -920,9 +920,8 @@ TEST_F(record_view_test, 
remove_all_nonexisting_keys_return_all) {
 
     EXPECT_EQ(res.size(), 2);
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
-    EXPECT_EQ(2, res[0].key);
-    EXPECT_EQ(1, res[1].key);
+    EXPECT_EQ(1, res[0].key);
+    EXPECT_EQ(2, res[1].key);
 }
 
 TEST_F(record_view_test, remove_all_only_existing) {
@@ -952,9 +951,8 @@ TEST_F(record_view_test, remove_all_overlapped) {
 
     EXPECT_EQ(res.size(), 2);
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
-    EXPECT_EQ(12, res[0].key);
-    EXPECT_EQ(11, res[1].key);
+    EXPECT_EQ(11, res[0].key);
+    EXPECT_EQ(12, res[1].key);
 }
 
 TEST_F(record_view_test, remove_all_empty) {
@@ -965,8 +963,10 @@ TEST_F(record_view_test, remove_all_empty) {
 TEST_F(record_view_test, remove_all_exact_nonexisting) {
     auto res = view.remove_all_exact(nullptr, {test_type(1, "foo"), 
test_type(2, "bar")});
 
-    // TODO: Key order should be preserved by the server (IGNITE-20435).
     ASSERT_EQ(2, res.size());
+
+    EXPECT_EQ(1, res[0].key);
+    EXPECT_EQ(2, res[1].key);
 }
 
 TEST_F(record_view_test, remove_all_exact_overlapped) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
similarity index 68%
rename from 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
rename to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
index cab67eedc0..9a7503e73d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
@@ -27,23 +27,26 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.InitParameters;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.Column;
@@ -71,12 +74,10 @@ import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
- * Tests for the read-only API.
+ * Tests for the internal table API.
  */
 @ExtendWith(WorkDirectoryExtension.class)
-public class ItRoReadsTest extends BaseIgniteAbstractTest {
-    private static final IgniteLogger LOG = 
Loggers.forClass(ItRoReadsTest.class);
-
+public class ItInternalTableTest extends BaseIgniteAbstractTest {
     private static final String TABLE_NAME = "SOME_TABLE";
 
     private static final SchemaDescriptor SCHEMA_1 = new SchemaDescriptor(
@@ -133,13 +134,9 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
 
     @AfterAll
     static void stopNode(TestInfo testInfo) throws Exception {
-        LOG.info("Start tearDown()");
-
         NODE = null;
 
         IgniteUtils.closeAll(() -> IgnitionManager.stop(testNodeName(testInfo, 
0)));
-
-        LOG.info("End tearDown()");
     }
 
     @BeforeEach
@@ -235,34 +232,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
 
         tx2.commit();
 
-        Publisher<BinaryRow> res = internalTable.scan(0, node.clock().now(), 
node.node());
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        List<BinaryRow> list = new ArrayList<>();
-
-        res.subscribe(new Subscriber<>() {
-            @Override
-            public void onSubscribe(Subscription subscription) {
-                subscription.request(100);
-            }
-
-            @Override
-            public void onNext(BinaryRow item) {
-                list.add(item);
-            }
-
-            @Override
-            public void onError(Throwable throwable) {
-            }
-
-            @Override
-            public void onComplete() {
-                latch.countDown();
-            }
-        });
-
-        latch.await();
+        List<BinaryRow> list = scanAllPartitions(node);
 
         assertThat(list, contains(equalToRow(keyValueRow2)));
     }
@@ -384,78 +354,206 @@ public class ItRoReadsTest extends 
BaseIgniteAbstractTest {
         roScanAll(false);
     }
 
-    private void roScanAll(boolean implicit) throws InterruptedException {
-        IgniteImpl node = node();
+    @Test
+    public void getAllOrderTest() {
+        List<BinaryRowEx> keyRows = 
populateEvenKeysAndPrepareEntriesToLookup(true);
 
         InternalTable internalTable = ((TableImpl) table).internalTable();
+        SchemaDescriptor schemaDescriptor = ((TableImpl) 
table).schemaView().schema();
 
-        KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+        CompletableFuture<List<BinaryRow>> getAllFut = 
internalTable.getAll(keyRows, null);
 
-        Publisher<BinaryRow> res = internalTable.scan(0, node.clock().now(), 
node.node());
+        assertThat(getAllFut, willCompleteSuccessfully());
 
-        var subscriberAllDataAwaitLatch = new CountDownLatch(1);
+        List<BinaryRow> res = getAllFut.join();
 
-        var retrievedItems = new ArrayList<BinaryRow>();
+        assertEquals(keyRows.size(), res.size());
 
-        res.subscribe(new Subscriber<>() {
-            @Override
-            public void onSubscribe(Subscription subscription) {
-                subscription.request(10000);
-            }
+        Iterator<BinaryRow> resIter = res.iterator();
 
-            @Override
-            public void onNext(BinaryRow item) {
-                retrievedItems.add(item);
-            }
+        for (BinaryRowEx key : keyRows) {
+            int i = 
TableRow.keyTuple(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, 
key)).<Long>value("key").intValue();
 
-            @Override
-            public void onError(Throwable throwable) {
-                fail("onError call is not expected.");
-            }
+            BinaryRow resRow = resIter.next();
+
+            if (i % 2 == 1) {
+                assertNull(resRow);
+            } else {
+                assertNotNull(resRow);
+
+                Tuple rowTuple = 
TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resRow));
 
-            @Override
-            public void onComplete() {
-                subscriberAllDataAwaitLatch.countDown();
+                assertEquals(i % 100L, rowTuple.<Long>value("key"));
+                assertEquals(i, rowTuple.<Integer>value("valInt"));
+                assertEquals("some string row" + i, 
rowTuple.<Integer>value("valStr"));
             }
-        });
+        }
+    }
 
-        subscriberAllDataAwaitLatch.await();
+    @Test
+    public void deleteAllOrderTest() {
+        List<BinaryRowEx> keyRows = 
populateEvenKeysAndPrepareEntriesToLookup(true);
 
-        assertEquals(0, retrievedItems.size());
+        InternalTable internalTable = ((TableImpl) table).internalTable();
+        SchemaDescriptor schemaDescriptor = ((TableImpl) 
table).schemaView().schema();
 
-        populateData(node, keyValueView, implicit);
+        CompletableFuture<List<BinaryRow>> deleteAllFut = 
internalTable.deleteAll(keyRows, null);
+
+        assertThat(deleteAllFut, willCompleteSuccessfully());
+
+        List<BinaryRow> res = deleteAllFut.join();
 
-        res = internalTable.scan(0, node.clock().now(), node.node());
+        Iterator<BinaryRow> resIter = res.iterator();
 
-        var subscriberAllDataAwaitLatch2 = new CountDownLatch(1);
+        for (BinaryRowEx key : keyRows) {
+            int i = 
TableRow.keyTuple(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, 
key)).<Long>value("key").intValue();
 
-        res.subscribe(new Subscriber<>() {
-            @Override
-            public void onSubscribe(Subscription subscription) {
-                subscription.request(10000);
+            if (i % 2 == 1) {
+                Tuple rowTuple = 
TableRow.keyTuple(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, resIter.next()));
+
+                assertEquals(i % 100L, rowTuple.<Long>value("key"));
             }
+        }
+    }
 
-            @Override
-            public void onNext(BinaryRow item) {
-                retrievedItems.add(item);
+    @Test
+    public void deleteAllExactOrderTest() {
+        List<BinaryRowEx> rowsToLookup = 
populateEvenKeysAndPrepareEntriesToLookup(false);
+
+        InternalTable internalTable = ((TableImpl) table).internalTable();
+        SchemaDescriptor schemaDescriptor = ((TableImpl) 
table).schemaView().schema();
+
+        CompletableFuture<List<BinaryRow>> deleteAllExactFut = 
internalTable.deleteAllExact(rowsToLookup, null);
+
+        assertThat(deleteAllExactFut, willCompleteSuccessfully());
+
+        List<BinaryRow> res = deleteAllExactFut.join();
+
+        Iterator<BinaryRow> resIter = res.iterator();
+
+        for (BinaryRowEx key : rowsToLookup) {
+            int i = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, 
key)).<Long>value("key").intValue();
+
+            if (i % 2 == 1) {
+                Tuple rowTuple = 
TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resIter.next()));
+
+                assertEquals(i % 100L, rowTuple.<Long>value("key"));
+                assertEquals(i, rowTuple.<Integer>value("valInt"));
+                assertEquals("some string row" + i, 
rowTuple.<Integer>value("valStr"));
             }
+        }
+    }
 
-            @Override
-            public void onError(Throwable throwable) {
-                fail("onError call is not expected.");
+    @Test
+    public void insertAllOrderTest() {
+        List<BinaryRowEx> rowsToLookup = 
populateEvenKeysAndPrepareEntriesToLookup(false);
+
+        InternalTable internalTable = ((TableImpl) table).internalTable();
+        SchemaDescriptor schemaDescriptor = ((TableImpl) 
table).schemaView().schema();
+
+        CompletableFuture<List<BinaryRow>> insertAllFut = 
internalTable.insertAll(rowsToLookup, null);
+
+        assertThat(insertAllFut, willCompleteSuccessfully());
+
+        List<BinaryRow> res = insertAllFut.join();
+
+        Iterator<BinaryRow> resIter = res.iterator();
+
+        for (BinaryRowEx key : rowsToLookup) {
+            int i = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, 
key)).<Long>value("key").intValue();
+
+            if (i % 2 == 0) {
+                Tuple rowTuple = 
TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resIter.next()));
+
+                assertEquals(i % 100L, rowTuple.<Long>value("key"));
+                assertEquals(i, rowTuple.<Integer>value("valInt"));
+                assertEquals("some string row" + i, 
rowTuple.<Integer>value("valStr"));
             }
+        }
+    }
 
-            @Override
-            public void onComplete() {
-                subscriberAllDataAwaitLatch2.countDown();
+    private ArrayList<BinaryRowEx> 
populateEvenKeysAndPrepareEntriesToLookup(boolean keyOnly) {
+        KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+        var keyRows = new ArrayList<BinaryRowEx>();
+
+        for (int i = 0; i < 15; i++) {
+            keyRows.add(keyOnly ? createKeyRow(i) : createKeyValueRow(i, i, 
"some string row" + i));
+
+            if (i % 2 == 0) {
+                putValue(keyValueView, i);
             }
-        });
+        }
+
+        Collections.shuffle(keyRows);
 
-        subscriberAllDataAwaitLatch2.await();
+        return keyRows;
+    }
+
+    private void roScanAll(boolean implicit) throws InterruptedException {
+        IgniteImpl node = node();
+
+        KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+        List<BinaryRow> retrievedItems = scanAllPartitions(node);
+
+        assertEquals(0, retrievedItems.size());
+
+        populateData(node, keyValueView, implicit);
+
+        retrievedItems = scanAllPartitions(node);
 
         assertEquals(15, retrievedItems.size());
     }
 
+
+    /**
+     * Scans all table entries.
+     *
+     * @param node Ignite instance.
+     * @return Collection with all rows.
+     * @throws InterruptedException If fail.
+     */
+    private static List<BinaryRow> scanAllPartitions(IgniteImpl node) throws 
InterruptedException {
+        InternalTable internalTable = ((TableImpl) 
node.tables().table(TABLE_NAME)).internalTable();
+
+        List<BinaryRow> retrievedItems = new CopyOnWriteArrayList<>();
+
+        int parts = internalTable.partitions();
+
+        var subscriberAllDataAwaitLatch = new CountDownLatch(parts);
+
+        for (int i = 0; i < parts; i++) {
+            Publisher<BinaryRow> res = internalTable.scan(i, 
node.clock().now(), node.node());
+
+            res.subscribe(new Subscriber<>() {
+                @Override
+                public void onSubscribe(Subscription subscription) {
+                    subscription.request(10000);
+                }
+
+                @Override
+                public void onNext(BinaryRow item) {
+                    retrievedItems.add(item);
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    fail("onError call is not expected.");
+                }
+
+                @Override
+                public void onComplete() {
+                    subscriberAllDataAwaitLatch.countDown();
+                }
+            });
+        }
+
+        assertTrue(subscriberAllDataAwaitLatch.await(10, TimeUnit.SECONDS));
+
+        return retrievedItems;
+    }
+
     private static Row createKeyValueRow(long id, int value, String str) {
         RowAssembler rowBuilder = new RowAssembler(SCHEMA_1, -1);
 
@@ -506,7 +604,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
         String zoneName = zoneNameForTable(tableName);
 
         try (Session session = node.sql().createSession()) {
-            session.execute(null, String.format("create zone \"%s\" with 
partitions=1, replicas=%d", zoneName, DEFAULT_REPLICA_COUNT));
+            session.execute(null, String.format("create zone \"%s\" with 
partitions=3, replicas=%d", zoneName, DEFAULT_REPLICA_COUNT));
 
             session.execute(null,
                     String.format(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 44d54ab469..d1b92dbe4f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import static 
org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
+import static 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectRejectedRowsResponsesWithRestoreOrder;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.lang.ErrorGroups.Sql.CONSTRAINT_VIOLATION_ERR;
 
@@ -54,6 +55,7 @@ import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMess
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import org.apache.ignite.internal.table.distributed.storage.RowBatch;
 import org.apache.ignite.internal.type.NativeTypeSpec;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.sql.SqlException;
@@ -251,34 +253,23 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
 
         RowHandler<RowT> handler = ectx.rowHandler();
 
-        Int2ObjectOpenHashMap<List<BinaryRow>> rowsByPartition = new 
Int2ObjectOpenHashMap<>();
+        Int2ObjectMap<RowBatch> rowBatchByPartitionId = 
toRowBatchByPartitionId(handler, rows);
 
-        for (RowT row : rows) {
-            BinaryRowEx binaryRow = convertRow(row, ectx.rowHandler());
+        for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : 
rowBatchByPartitionId.int2ObjectEntrySet()) {
+            int partitionId = partitionRowBatch.getIntKey();
+            RowBatch rowBatch = partitionRowBatch.getValue();
 
-            
rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new 
ArrayList<>()).add(binaryRow);
-        }
-
-        CompletableFuture<List<RowT>>[] futures = new 
CompletableFuture[rowsByPartition.size()];
-
-        int batchNum = 0;
-
-        IgniteTypeFactory typeFactory = ectx.getTypeFactory();
-        RowSchema rowSchema = 
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(desc.insertRowType(typeFactory)));
-        RowHandler.RowFactory<RowT> rowFactory = handler.factory(rowSchema);
-
-        for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows : 
rowsByPartition.int2ObjectEntrySet()) {
-            TablePartitionId partGroupId = new TablePartitionId(tableId, 
partToRows.getIntKey());
+            TablePartitionId partGroupId = new TablePartitionId(tableId, 
partitionId);
             ColocationGroup group = ectx.group(MODIFY_NODE_SOURCE_ID);
 
             assert group != null;
 
-            NodeWithTerm nodeWithTerm = 
group.assignments().get(partToRows.getIntKey());
+            NodeWithTerm nodeWithTerm = group.assignments().get(partitionId);
 
             ReadWriteMultiRowReplicaRequest request = 
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(partGroupId)
                     
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
-                    
.binaryRowMessages(serializeBinaryRows(partToRows.getValue()))
+                    
.binaryRowMessages(serializeBinaryRows(rowBatch.requestedRows))
                     .transactionId(txAttributes.id())
                     .term(nodeWithTerm.term())
                     .requestType(RequestType.RW_INSERT_ALL)
@@ -286,25 +277,30 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
                     .skipDelayedAck(true)
                     .build();
 
-            futures[batchNum++] = replicaService.invoke(nodeWithTerm.name(), 
request)
-                    .thenApply(result -> {
-                        Collection<BinaryRow> binaryRows = 
(Collection<BinaryRow>) result;
+            rowBatch.resultFuture = replicaService.invoke(nodeWithTerm.name(), 
request);
+        }
 
-                        if (binaryRows.isEmpty()) {
-                            return List.of();
-                        }
+        return handleInsertResults(ectx, rowBatchByPartitionId.values());
+    }
 
-                        List<RowT> conflictRows = new 
ArrayList<>(binaryRows.size());
+    /**
+     * Creates batches of rows for processing, grouped by partition ID.
+     *
+     * @param rowHandler Row handler.
+     * @param rows Rows.
+     */
+    private <T> Int2ObjectMap<RowBatch> toRowBatchByPartitionId(RowHandler<T> 
rowHandler, List<T> rows) {
+        Int2ObjectMap<RowBatch> rowBatchByPartitionId = new 
Int2ObjectOpenHashMap<>();
 
-                        for (BinaryRow row : binaryRows) {
-                            conflictRows.add(rowConverter.toRow(ectx, row, 
rowFactory));
-                        }
+        int i = 0;
 
-                        return conflictRows;
-                    });
+        for (T row : rows) {
+            BinaryRowEx binaryRow = convertRow(row, rowHandler);
+
+            
rowBatchByPartitionId.computeIfAbsent(partitionExtractor.fromRow(binaryRow), 
partitionId -> new RowBatch()).add(binaryRow, i++);
         }
 
-        return handleInsertResults(handler, futures);
+        return rowBatchByPartitionId;
     }
 
     /** {@inheritDoc} */
@@ -387,35 +383,28 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
         rowAssembler.appendValue(val);
     }
 
-    private static <RowT> CompletableFuture<List<RowT>> handleInsertResults(
-            RowHandler<RowT> handler,
-            CompletableFuture<List<RowT>>[] futs
+    private <RowT> CompletableFuture<List<RowT>> handleInsertResults(
+            ExecutionContext<RowT> ectx,
+            Collection<RowBatch> batches
     ) {
-        return CompletableFuture.allOf(futs)
+        return collectRejectedRowsResponsesWithRestoreOrder(batches)
                 .thenApply(response -> {
-                    List<String> conflictRows = null;
-
-                    for (CompletableFuture<List<RowT>> future : futs) {
-                        List<RowT> values = future.join();
-
-                        if (nullOrEmpty(values)) {
-                            continue;
-                        }
+                    if (nullOrEmpty(response)) {
+                        return null;
+                    }
 
-                        if (conflictRows == null) {
-                            conflictRows = new ArrayList<>(values.size());
-                        }
+                    RowHandler<RowT> handler = ectx.rowHandler();
+                    IgniteTypeFactory typeFactory = ectx.getTypeFactory();
+                    RowSchema rowSchema = 
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(desc.insertRowType(typeFactory)));
+                    RowHandler.RowFactory<RowT> rowFactory = 
handler.factory(rowSchema);
 
-                        for (RowT row : values) {
-                            conflictRows.add(handler.toString(row));
-                        }
-                    }
+                    ArrayList<String> conflictRows = new 
ArrayList<>(response.size());
 
-                    if (conflictRows != null) {
-                        throw conflictKeysException(conflictRows);
+                    for (BinaryRow row : response) {
+                        
conflictRows.add(handler.toString(rowConverter.toRow(ectx, row, rowFactory)));
                     }
 
-                    return null;
+                    throw conflictKeysException(conflictRows);
                 });
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 4d870688f4..e7afd3ee1d 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NullBinaryRow;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
@@ -183,7 +184,9 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                 });
 
                 if (cmd instanceof UpdateAllCommand) {
-                    return completedFuture(List.of());
+                    return completedFuture(((UpdateAllCommand) 
cmd).rowsToUpdate().keySet().stream()
+                            .map(uuid -> new NullBinaryRow())
+                            .collect(Collectors.toList()));
                 } else {
                     return completedFuture(true);
                 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 9c2e2d4fe9..1b588578da 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -178,10 +178,12 @@ public interface InternalTable extends ManuallyCloseable {
      * Asynchronously insert rows into the table which do not exist, skipping 
existed ones.
      *
      * @param rows Rows to insert into the table.
-     * @param tx   The transaction.
-     * @return Future representing pending completion of the operation.
+     * @param tx The transaction.
+     * @return Future represents the pending completion of the operation, with 
rejected rows for insertion in the result. The order of
+     *         collection elements is guaranteed to be the same as the order 
of {@code rows}. If a record is inserted, the element will be
+     *         excluded from the collection result.
      */
-    CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRowEx> 
rows, @Nullable InternalTransaction tx);
+    CompletableFuture<List<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, 
@Nullable InternalTransaction tx);
 
     /**
      * Asynchronously replaces an existed row associated with the same key 
columns values as the given one has.
@@ -243,19 +245,23 @@ public interface InternalTable extends ManuallyCloseable {
      * Asynchronously remove rows with the same key columns values as the 
given one has from the table.
      *
      * @param rows Rows with key columns set.
-     * @param tx   The transaction.
-     * @return Future representing pending completion of the operation.
+     * @param tx The transaction.
+     * @return Future represents the pending completion of the operation, with 
rejected rows for deletion in the result. The order of
+     *         collection elements is guaranteed to be the same as the order 
of {@code rows}. If a record is deleted, the element will be
+     *         excluded from the collection result.
      */
-    CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRowEx> 
rows, @Nullable InternalTransaction tx);
+    CompletableFuture<List<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, 
@Nullable InternalTransaction tx);
 
     /**
      * Asynchronously remove given rows from the table.
      *
      * @param rows Rows to delete.
-     * @param tx   The transaction.
-     * @return Future representing pending completion of the operation.
+     * @param tx The transaction.
+     * @return Future represents the pending completion of the operation, with 
rejected rows for deletion in the result. The order of
+     *         collection elements is guaranteed to be the same as the order 
of {@code rows}. If a record is deleted, the element will be
+     *         excluded from the collection result.
      */
-    CompletableFuture<Collection<BinaryRow>> 
deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx);
+    CompletableFuture<List<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> 
rows, @Nullable InternalTransaction tx);
 
     /**
      * Returns a partition for a key.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 6920f5fe92..7284c12a2b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -176,13 +176,13 @@ public class RecordBinaryViewImpl extends 
AbstractTableView implements RecordVie
 
     /** {@inheritDoc} */
     @Override
-    public Collection<Tuple> insertAll(@Nullable Transaction tx, 
Collection<Tuple> recs) {
+    public List<Tuple> insertAll(@Nullable Transaction tx, Collection<Tuple> 
recs) {
         return sync(insertAllAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<Tuple>> insertAllAsync(@Nullable 
Transaction tx, Collection<Tuple> recs) {
+    public CompletableFuture<List<Tuple>> insertAllAsync(@Nullable Transaction 
tx, Collection<Tuple> recs) {
         Objects.requireNonNull(recs);
 
         return withSchemaSync(tx, (schemaVersion) -> {
@@ -303,13 +303,13 @@ public class RecordBinaryViewImpl extends 
AbstractTableView implements RecordVie
 
     /** {@inheritDoc} */
     @Override
-    public Collection<Tuple> deleteAll(@Nullable Transaction tx, 
Collection<Tuple> keyRecs) {
+    public List<Tuple> deleteAll(@Nullable Transaction tx, Collection<Tuple> 
keyRecs) {
         return sync(deleteAllAsync(tx, keyRecs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<Tuple>> deleteAllAsync(@Nullable 
Transaction tx, Collection<Tuple> keyRecs) {
+    public CompletableFuture<List<Tuple>> deleteAllAsync(@Nullable Transaction 
tx, Collection<Tuple> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
         return withSchemaSync(tx, (schemaVersion) -> {
@@ -320,13 +320,13 @@ public class RecordBinaryViewImpl extends 
AbstractTableView implements RecordVie
 
     /** {@inheritDoc} */
     @Override
-    public Collection<Tuple> deleteAllExact(@Nullable Transaction tx, 
Collection<Tuple> recs) {
+    public List<Tuple> deleteAllExact(@Nullable Transaction tx, 
Collection<Tuple> recs) {
         return sync(deleteAllExactAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<Tuple> recs) {
+    public CompletableFuture<List<Tuple>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<Tuple> recs) {
         Objects.requireNonNull(recs);
 
         return withSchemaSync(tx, (schemaVersion) -> {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 27af10db67..4e680c6b65 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -171,13 +171,13 @@ public class RecordViewImpl<R> extends AbstractTableView 
implements RecordView<R
 
     /** {@inheritDoc} */
     @Override
-    public Collection<R> insertAll(@Nullable Transaction tx, Collection<R> 
recs) {
+    public List<R> insertAll(@Nullable Transaction tx, Collection<R> recs) {
         return sync(insertAllAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<R>> insertAllAsync(@Nullable 
Transaction tx, Collection<R> recs) {
+    public CompletableFuture<List<R>> insertAllAsync(@Nullable Transaction tx, 
Collection<R> recs) {
         Objects.requireNonNull(recs);
 
         return withSchemaSync(tx, (schemaVersion) -> {
@@ -299,13 +299,13 @@ public class RecordViewImpl<R> extends AbstractTableView 
implements RecordView<R
 
     /** {@inheritDoc} */
     @Override
-    public Collection<R> deleteAll(@Nullable Transaction tx, Collection<R> 
keyRecs) {
+    public List<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs) {
         return sync(deleteAllAsync(tx, keyRecs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<R>> deleteAllAsync(@Nullable 
Transaction tx, Collection<R> keyRecs) {
+    public CompletableFuture<List<R>> deleteAllAsync(@Nullable Transaction tx, 
Collection<R> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
         return withSchemaSync(tx, (schemaVersion) -> {
@@ -317,13 +317,13 @@ public class RecordViewImpl<R> extends AbstractTableView 
implements RecordView<R
 
     /** {@inheritDoc} */
     @Override
-    public Collection<R> deleteAllExact(@Nullable Transaction tx, 
Collection<R> recs) {
+    public List<R> deleteAllExact(@Nullable Transaction tx, Collection<R> 
recs) {
         return sync(deleteAllExactAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<R>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<R> keyRecs) {
+    public CompletableFuture<List<R>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<R> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
         return withSchemaSync(tx, (schemaVersion) -> {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 549499fd3e..3a58e43c80 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1772,6 +1772,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                 return allOf(deleteExactLockFuts).thenCompose(ignore -> {
                     Map<UUID, BinaryRowMessage> rowIdsToDelete = new 
HashMap<>();
+                    // TODO:IGNITE-20669 Replace the result to BitSet.
                     Collection<BinaryRow> result = new ArrayList<>();
 
                     for (int i = 0; i < searchRows.size(); i++) {
@@ -1779,8 +1780,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                         if (lockedRowId != null) {
                             rowIdsToDelete.put(lockedRowId.uuid(), null);
+
+                            result.add(new NullBinaryRow());
                         } else {
-                            result.add(searchRows.get(i));
+                            result.add(null);
                         }
                     }
 
@@ -1815,6 +1818,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
 
                 return allOf(pkReadLockFuts).thenCompose(ignore -> {
+                    // TODO:IGNITE-20669 Replace the result to BitSet.
                     Collection<BinaryRow> result = new ArrayList<>();
                     Map<RowId, BinaryRow> rowsToInsert = new HashMap<>();
                     Set<ByteBuffer> uniqueKeys = new HashSet<>();
@@ -1825,8 +1829,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                         if (lockedRow == null && 
uniqueKeys.add(pks.get(i).byteBuffer())) {
                             rowsToInsert.put(new RowId(partId(), 
UUID.randomUUID()), row);
+
+                            result.add(new NullBinaryRow());
                         } else {
-                            result.add(row);
+                            result.add(null);
                         }
                     }
 
@@ -2007,6 +2013,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                 return allOf(rowIdLockFuts).thenCompose(ignore -> {
                     Map<UUID, BinaryRowMessage> rowIdsToDelete = new 
HashMap<>();
+                    // TODO:IGNITE-20669 Replace the result to BitSet.
                     Collection<BinaryRow> result = new ArrayList<>();
 
                     for (CompletableFuture<RowId> lockFut : rowIdLockFuts) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index a5f84c2a2f..cea79a364e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -38,7 +38,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.net.ConnectException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
@@ -94,7 +93,6 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.Multiple
 import 
org.apache.ignite.internal.table.distributed.replication.request.MultipleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
-import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
 import 
org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest;
@@ -475,7 +473,7 @@ public class InternalTableImpl implements InternalTable {
                                 if (e2 != null) {
                                     return result.completeExceptionally(e2);
                                 } else {
-                                    return result.complete((R) r2);
+                                    return result.complete(r2);
                                 }
                             });
                         }
@@ -992,7 +990,7 @@ public class InternalTableImpl implements InternalTable {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<BinaryRow>> 
insertAll(Collection<BinaryRowEx> rows, InternalTransaction tx) {
+    public CompletableFuture<List<BinaryRow>> 
insertAll(Collection<BinaryRowEx> rows, InternalTransaction tx) {
         return enlistInTx(
                 rows,
                 tx,
@@ -1006,11 +1004,16 @@ public class InternalTableImpl implements InternalTable 
{
                         .timestampLong(clock.nowLong())
                         .full(full)
                         .build(),
-                
InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder,
+                
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
                 (res, req) -> {
-                    ReadWriteMultiRowReplicaRequest r = 
(ReadWriteMultiRowReplicaRequest) req;
+                    for (BinaryRow row : res) {
+                        if (row != null) {
+                            return false;
+                        }
+                    }
 
-                    return res.size() == r.binaryRowMessages().size();
+                    // All values are null, this means nothing was deleted.
+                    return true;
                 }
         );
     }
@@ -1138,7 +1141,7 @@ public class InternalTableImpl implements InternalTable {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<BinaryRow>> 
deleteAll(Collection<BinaryRowEx> rows, InternalTransaction tx) {
+    public CompletableFuture<List<BinaryRow>> 
deleteAll(Collection<BinaryRowEx> rows, InternalTransaction tx) {
         return enlistInTx(
                 rows,
                 tx,
@@ -1152,24 +1155,7 @@ public class InternalTableImpl implements InternalTable {
                         .timestampLong(clock.nowLong())
                         .full(full)
                         .build(),
-                rowBatches -> allResultFutures(rowBatches).thenApply(v -> {
-                    List<BinaryRow> result = new ArrayList<>();
-
-                    for (RowBatch batch : rowBatches) {
-                        List<BinaryRow> requestedRows = batch.requestedRows;
-                        List<BinaryRow> response = (List<BinaryRow>) 
batch.resultFuture.join();
-
-                        assert requestedRows.size() == response.size();
-
-                        for (int i = 0; i < requestedRows.size(); i++) {
-                            if (response.get(i) == null) {
-                                result.add(requestedRows.get(i));
-                            }
-                        }
-                    }
-
-                    return result;
-                }),
+                
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
                 (res, req) -> {
                     for (BinaryRow row : res) {
                         if (row != null) {
@@ -1185,7 +1171,7 @@ public class InternalTableImpl implements InternalTable {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Collection<BinaryRow>> deleteAllExact(
+    public CompletableFuture<List<BinaryRow>> deleteAllExact(
             Collection<BinaryRowEx> rows,
             InternalTransaction tx
     ) {
@@ -1202,11 +1188,16 @@ public class InternalTableImpl implements InternalTable 
{
                         .timestampLong(clock.nowLong())
                         .full(full)
                         .build(),
-                
InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder,
+                
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
                 (res, req) -> {
-                    ReadWriteMultiRowReplicaRequest r = 
(ReadWriteMultiRowReplicaRequest) req;
+                    for (BinaryRow row : res) {
+                        if (row != null) {
+                            return false;
+                        }
+                    }
 
-                    return res.size() == r.binaryRowMessages().size();
+                    // All values are null, this means nothing was deleted.
+                    return true;
                 }
         );
     }
@@ -1554,44 +1545,66 @@ public class InternalTableImpl implements InternalTable 
{
     }
 
     /**
-     * Gathers the result of batch processing into a single resulting 
collection of rows.
+     * Gets a batch result.
      *
      * @param rowBatches Row batches.
      * @return Future of collecting results.
      */
-    static CompletableFuture<Collection<BinaryRow>> 
collectMultiRowsResponsesWithoutRestoreOrder(Collection<RowBatch> rowBatches) {
-        return allResultFutures(rowBatches)
-                .thenApply(response -> {
-                    var result = new ArrayList<BinaryRow>(rowBatches.size());
-
-                    for (RowBatch rowBatch : rowBatches) {
-                        Collection<BinaryRow> batchResult = 
(Collection<BinaryRow>) rowBatch.getCompletedResult();
+    public static CompletableFuture<List<BinaryRow>> 
collectRejectedRowsResponsesWithRestoreOrder(Collection<RowBatch> rowBatches) {
+        return collectMultiRowsResponsesWithRestoreOrder(
+                rowBatches,
+                batch -> {
+                    List<BinaryRow> result = new ArrayList<>();
+                    List<BinaryRow> response = (List<BinaryRow>) 
batch.getCompletedResult();
 
-                        if (batchResult == null) {
-                            continue;
-                        }
+                    assert batch.requestedRows.size() == response.size() :
+                            "Replication response does not fit to request 
[requestRows=" + batch.requestedRows.size()
+                                    + "responseRows=" + response.size() + ']';
 
-                        result.addAll(batchResult);
+                    for (int i = 0; i < response.size(); i++) {
+                        result.add(response.get(i) != null ? null : 
batch.requestedRows.get(i));
                     }
 
                     return result;
-                });
+                },
+                true
+        );
+    }
+
+    /**
+     * Gets a batch result.
+     *
+     * @param rowBatches Row batches.
+     * @return Future of collecting results.
+     */
+    static CompletableFuture<List<BinaryRow>> 
collectMultiRowsResponsesWithRestoreOrder(Collection<RowBatch> rowBatches) {
+        return collectMultiRowsResponsesWithRestoreOrder(
+                rowBatches,
+                batch -> (Collection<BinaryRow>) batch.getCompletedResult(),
+                false
+        );
     }
 
     /**
      * Gathers the result of batch processing into a single resulting 
collection of rows, restoring order as in the requested collection of
      * rows.
      *
-     * @param rowBatches Row batches by partition ID.
+     * @param rowBatches Row batches.
+     * @param bathResultMapper Map a batch to the result collection of binary 
rows.
+     * @param skipNull True to skip the null in result collection, false 
otherwise.
      * @return Future of collecting results.
      */
-    static CompletableFuture<List<BinaryRow>> 
collectMultiRowsResponsesWithRestoreOrder(Collection<RowBatch> rowBatches) {
+    private static CompletableFuture<List<BinaryRow>> 
collectMultiRowsResponsesWithRestoreOrder(
+            Collection<RowBatch> rowBatches,
+            Function<RowBatch, Collection<BinaryRow>> bathResultMapper,
+            boolean skipNull
+    ) {
         return allResultFutures(rowBatches)
                 .thenApply(response -> {
                     var result = new 
BinaryRow[RowBatch.getTotalRequestedRowSize(rowBatches)];
 
                     for (RowBatch rowBatch : rowBatches) {
-                        Collection<BinaryRow> batchResult = 
(Collection<BinaryRow>) rowBatch.getCompletedResult();
+                        Collection<BinaryRow> batchResult = 
bathResultMapper.apply(rowBatch);
 
                         assert batchResult != null;
 
@@ -1605,8 +1618,15 @@ public class InternalTableImpl implements InternalTable {
                         }
                     }
 
-                    // Use Arrays#asList to avoid copying the array.
-                    return Arrays.asList(result);
+                    ArrayList<BinaryRow> resultToReturn = new ArrayList<>();
+
+                    for (BinaryRow row : result) {
+                        if (!skipNull || row != null) {
+                            resultToReturn.add(row);
+                        }
+                    }
+
+                    return resultToReturn;
                 });
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java
index 27b97b19f0..0b2a6feea3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java
@@ -31,9 +31,9 @@ import org.jetbrains.annotations.Nullable;
  *
  * <p>NOTE: Not thread-safe.
  */
-class RowBatch {
+public class RowBatch {
     /** Batch of rows from the original collection of rows. */
-    final List<BinaryRow> requestedRows = new ArrayList<>();
+    public final List<BinaryRow> requestedRows = new ArrayList<>();
 
     /** Order of the rows from the {@link #requestedRows} in the original row 
collection. */
     final IntList originalRowOrder = new IntArrayList();
@@ -41,9 +41,9 @@ class RowBatch {
     /**
      * Future of the result of processing the {@link #requestedRows}, {@code 
null} if not set and may return {@code null}.
      */
-    @Nullable CompletableFuture<?> resultFuture;
+    public @Nullable CompletableFuture<?> resultFuture;
 
-    void add(BinaryRow row, int originalIndex) {
+    public void add(BinaryRow row, int originalIndex) {
         requestedRows.add(row);
         originalRowOrder.add(originalIndex);
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 8e3fb9740f..f06e86eede 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -19,12 +19,11 @@ package 
org.apache.ignite.internal.table.distributed.storage;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectMultiRowsResponsesWithRestoreOrder;
-import static 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectMultiRowsResponsesWithoutRestoreOrder;
+import static 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectRejectedRowsResponsesWithRestoreOrder;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -35,12 +34,14 @@ import static org.mockito.Mockito.when;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.NullBinaryRow;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
@@ -151,9 +152,39 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 willBe(equalTo(originalRows))
         );
 
+        var part1 = new ArrayList<>(2);
+
+        part1.add(null);
+        part1.add(new NullBinaryRow());
+
+        rowBatchByPartitionId.get(0).resultFuture = completedFuture(part1);
+
+        var part2 = new ArrayList<>(2);
+
+        part2.add(null);
+
+        rowBatchByPartitionId.get(1).resultFuture = completedFuture(part2);
+
+        var part3 = new ArrayList<>(2);
+
+        part3.add(new NullBinaryRow());
+        part3.add(null);
+        part3.add(new NullBinaryRow());
+
+        rowBatchByPartitionId.get(2).resultFuture = completedFuture(part3);
+
+        List<BinaryRowEx> rejectedRows = List.of(
+                // Rows for 0 partition.
+                originalRows.get(0),
+                // Rows for 1 partition.
+                originalRows.get(2),
+                // Rows for 2 partition.
+                originalRows.get(4)
+        );
+
         assertThat(
-                
collectMultiRowsResponsesWithoutRestoreOrder(rowBatchByPartitionId.values()),
-                willBe(hasItems(originalRows.toArray(BinaryRowEx[]::new)))
+                
collectRejectedRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values()),
+                willBe(equalTo(rejectedRows))
         );
     }
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index abd834852b..c3d191a568 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -218,6 +218,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
      *         ReplicaService by itself.
      * @param transactionStateResolver Transaction state resolver.
      * @param schema Schema descriptor.
+     * @param tracker Observable timestamp tracker.
      */
     public DummyInternalTableImpl(
             ReplicaService replicaSvc,
@@ -242,7 +243,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 tracker,
                 TEST_PLACEMENT_DRIVER
         );
-        RaftGroupService svc = raftGroupServiceByPartitionId.get(0);
+        RaftGroupService svc = raftGroupServiceByPartitionId.get(PART_ID);
 
         groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) : 
crossTableGroupId;
 

Reply via email to