vldpyatkov commented on code in PR #6779:
URL: https://github.com/apache/ignite-3/pull/6779#discussion_r2524820346


##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java:
##########
@@ -74,53 +77,42 @@ public Tuple tuple2() {
         return tuple2;
     }
 
-    public static CompletableFuture<ClientTupleRequestBase> readAsync(
-            ClientMessageUnpacker in,
-            IgniteTables tables,
-            ClientResourceRegistry resources,
-            @Nullable TxManager txManager,
-            boolean txReadOnly,
-            @Nullable NotificationSender notificationSender,
-            @Nullable HybridTimestampTracker tsTracker,
-            boolean keyOnly
-    ) {
-        return readAsync(in, tables, resources, txManager, txReadOnly, 
notificationSender, tsTracker, keyOnly, false);
+    public enum RequestOptions {
+        READ_ONLY,
+        KEY_ONLY,
+        READ_SECOND_TUPLE,
+        HAS_PRIORITY
     }
 
     public static CompletableFuture<ClientTupleRequestBase> readAsync(
             ClientMessageUnpacker in,
             IgniteTables tables,
             ClientResourceRegistry resources,
-            @Nullable TxManager txManager,
-            boolean txReadOnly,
+            TxManager txManager,
             @Nullable NotificationSender notificationSender,
             @Nullable HybridTimestampTracker tsTracker,
-            boolean keyOnly,
-            boolean readSecondTuple
+            EnumSet<RequestOptions> options
     ) {
-        assert (txManager != null) == (tsTracker != null) : "txManager and 
tsTracker must be both null or not null";

Review Comment:
   Why can the tracker still be null?



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java:
##########
@@ -111,26 +112,26 @@ public CompletableFuture<Map<Tuple, Tuple>> 
getAllAsync(@Nullable Transaction tx
             return emptyMapCompletedFuture();
         }
 
-        BiFunction<Collection<Tuple>, PartitionAwarenessProvider, 
CompletableFuture<Map<Tuple, Tuple>>> clo = (batch, provider) -> {
+        List<Transaction> txns = new ArrayList<>();
+
+        MapFunction<Tuple, Map<Tuple, Tuple>> clo = (batch, provider, 
txRequired) -> {
+            Transaction tx0 = tbl.startTxIfNeeded(tx, txns, txRequired);

Review Comment:
   It so unclear to maintain an array list (txns) in the other method instead 
of doing it in this place.
   ```
   Transaction tx0 = tbl.startTxIfNeeded(tx, txRequired);
   txns.add(tx0);
   ```



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java:
##########
@@ -855,6 +881,133 @@ void testBatchScenarioWithNoopEnlistmentImplicit() {
         assertEquals(batch.size(), view.removeAll(null, 
batch.keySet()).size());
     }
 
+    @Test
+    void testImplicitDirectMapping() {
+        Map<Partition, ClusterNode> map = 
table().partitionManager().primaryReplicasAsync().join();
+
+        ClientTable table = (ClientTable) table();
+
+        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+        List<Tuple> tuples0 = generateKeysForNode(600, 2, map, 
server0.cluster().localNode(), table);
+        List<Tuple> tuples1 = generateKeysForNode(610, 1, map, 
server1.cluster().localNode(), table);
+
+        assertEquals(2, tuples0.size());
+        assertEquals(1, tuples1.size());
+
+        Map<Tuple, Tuple> batch = new HashMap<>();
+
+        for (Tuple tup : tuples0) {
+            batch.put(tup, val(tup.intValue(0) + ""));
+        }
+
+        for (Tuple tup : tuples1) {
+            batch.put(tup, val(tup.intValue(0) + ""));
+        }
+
+        KeyValueView<Tuple, Tuple> view = table.keyValueView();
+        Transaction tx = client().transactions().begin();
+        view.putAll(tx, batch);
+
+        // Should retry until timeout.
+        CompletableFuture<Map<Tuple, Tuple>> fut = view.getAllAsync(null, 
batch.keySet());
+
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        assertFalse(fut.isDone());
+        tx.commit();
+
+        assertEquals(batch.size(), fut.join().size(), "Implicit tx should be 
retried until timeout");
+
+        // Retry transaction without other locker.
+        assertEquals(batch.size(), view.getAll(null, batch.keySet()).size());
+
+        // Retry expliti transaction.
+        Transaction tx1 = client().transactions().begin();
+        assertEquals(batch.size(), view.getAll(tx1, batch.keySet()).size());
+        tx1.commit();
+
+        // Test if we don't stuck in locks in subsequent rw txn.
+        CompletableFuture.runAsync(() -> {

Review Comment:
   Why do you make the async closure if you are waiting for the result 
immediately?
   Here and next.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java:
##########
@@ -531,6 +531,32 @@ static List<Tuple> generateKeysForNode(
         return keys;
     }
 
+    static List<Tuple> generateKeysForPartition(

Review Comment:
   The method is used nowhere.



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -839,25 +868,202 @@ <E> CompletableFuture<List<E>> split(
                         idx++;
                     }
 
-                    List<CompletableFuture<List<E>>> res = new 
ArrayList<>(aff.size());
-                    List<Batch<E>> batches = new ArrayList<>();
+                    CompletableFuture<List<E>> resFut = new 
CompletableFuture<>();
+                    mapAndRetry(fun, keys, txns, mapped, new long[1], resFut, 
log);
+                    return resFut;
+                });
+    }
+
+    private static <R, E> void mapAndRetry(
+            MapFunction<E, R> mapFun,
+            @Nullable R initialValue, Reducer<R> reducer,
+            List<Transaction> txns,
+            Map<Integer, List<E>> mapped,
+            long[] startTs,
+            CompletableFuture<R> resFut,
+            IgniteLogger log
+    ) {
+        if (startTs[0] == 0) {
+            startTs[0] = System.nanoTime();
+        }
+
+        List<CompletableFuture<R>> res = new ArrayList<>();
+
+        for (Entry<Integer, List<E>> entry : mapped.entrySet()) {
+            res.add(mapFun.apply(entry.getValue(), 
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+        }
+
+        CompletableFutures.allOf(res).handle((ignored, err) -> {
+            List<CompletableFuture<Void>> waitCommitFuts = List.of();
+            if (!txns.isEmpty()) {
+                boolean allRetryableExceptions = true;
+
+                for (int i = 0; i < res.size(); i++) {
+                    CompletableFuture<R> fut0 = res.get(i);
+                    if (fut0.isCompletedExceptionally()) {
+                        try {
+                            fut0.join();
+                        } catch (CompletionException e) {
+                            allRetryableExceptions = 
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);

Review Comment:
   `allRetryableExceptions = allRetryableExceptions && 
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);`



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -839,25 +868,202 @@ <E> CompletableFuture<List<E>> split(
                         idx++;
                     }
 
-                    List<CompletableFuture<List<E>>> res = new 
ArrayList<>(aff.size());
-                    List<Batch<E>> batches = new ArrayList<>();
+                    CompletableFuture<List<E>> resFut = new 
CompletableFuture<>();
+                    mapAndRetry(fun, keys, txns, mapped, new long[1], resFut, 
log);
+                    return resFut;
+                });
+    }
+
+    private static <R, E> void mapAndRetry(
+            MapFunction<E, R> mapFun,
+            @Nullable R initialValue, Reducer<R> reducer,
+            List<Transaction> txns,
+            Map<Integer, List<E>> mapped,
+            long[] startTs,
+            CompletableFuture<R> resFut,
+            IgniteLogger log
+    ) {
+        if (startTs[0] == 0) {
+            startTs[0] = System.nanoTime();
+        }
+
+        List<CompletableFuture<R>> res = new ArrayList<>();
+
+        for (Entry<Integer, List<E>> entry : mapped.entrySet()) {
+            res.add(mapFun.apply(entry.getValue(), 
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+        }
+
+        CompletableFutures.allOf(res).handle((ignored, err) -> {
+            List<CompletableFuture<Void>> waitCommitFuts = List.of();
+            if (!txns.isEmpty()) {
+                boolean allRetryableExceptions = true;
+
+                for (int i = 0; i < res.size(); i++) {
+                    CompletableFuture<R> fut0 = res.get(i);
+                    if (fut0.isCompletedExceptionally()) {
+                        try {
+                            fut0.join();
+                        } catch (CompletionException e) {
+                            allRetryableExceptions = 
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);
+                        }
+                    }
+                    Transaction tx0 = txns.get(i);
+                    tx0.rollbackAsync().whenComplete((r, e) -> {
+                        if (e != null) {
+                            log.error("Failed to rollback a transactional 
batch: [tx=" + tx0 + ']', e);
+                        }
+                    });
+                }
+
+                if (err != null) {
+                    // Check if we can retry.
+                    long nowRelative = System.nanoTime();
+                    if (allRetryableExceptions && nowRelative - startTs[0] < 
DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS) {
+                        startTs[0] = nowRelative;
+                        txns.clear(); // This collection is re-filled on next 
map attempt.
+
+                        mapAndRetry(mapFun, initialValue, reducer, txns, 
mapped, startTs, resFut, log);
 
-                    for (Entry<Integer, Batch<E>> entry : mapped.entrySet()) {
-                        res.add(fun.apply(entry.getValue().batch, 
PartitionAwarenessProvider.of(entry.getKey())));
-                        batches.add(entry.getValue());
+                        return null;
                     }
 
-                    return CompletableFuture.allOf(res.toArray(new 
CompletableFuture[0])).thenApply(ignored -> {
-                        var in = new 
ArrayList<E>(Collections.nCopies(keys.size(), null));
+                    resFut.completeExceptionally(err);
+
+                    return null;
+                }
+
+                waitCommitFuts = unlockFragments(txns, log);
+            } else {
+                if (err != null) {
+                    resFut.completeExceptionally(err);
+
+                    return null;
+                }
+            }
+
+            R in = initialValue;
+
+            for (CompletableFuture<R> val : res) {
+                in = reducer.reduce(in, val.getNow(null));
+            }
+
+            if (waitCommitFuts.isEmpty()) {
+                resFut.complete(in);
+            } else {
+                R finalIn = in;
+                CompletableFutures.allOf(waitCommitFuts).whenComplete((r, e) 
-> {
+                    // Ignore errors.
+                    resFut.complete(finalIn);
+                });
+            }
 
-                        for (int i = 0; i < res.size(); i++) {
-                            CompletableFuture<List<E>> f = res.get(i);
-                            reduceWithKeepOrder(in, f.getNow(null), 
batches.get(i).originalIndices);
+            return null;
+        });
+    }
+
+    private static <E> void mapAndRetry(
+            MapFunction<E, List<E>> mapFun,
+            Collection<E> keys,
+            List<Transaction> txns,
+            Map<Integer, Batch<E>> mapped,
+            long[] startTs,
+            CompletableFuture<List<E>> resFut,
+            IgniteLogger log
+    ) {
+        if (startTs[0] == 0) {
+            startTs[0] = System.nanoTime();
+        }
+
+        List<CompletableFuture<List<E>>> res = new ArrayList<>(mapped.size());
+        List<Batch<E>> batches = new ArrayList<>();
+
+        for (Entry<Integer, Batch<E>> entry : mapped.entrySet()) {
+            res.add(mapFun.apply(entry.getValue().batch, 
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+            batches.add(entry.getValue());
+        }
+
+        CompletableFutures.allOf(res).handle((ignored, err) -> {
+            // TODO remove copy paste
+            List<CompletableFuture<Void>> waitCommitFuts = List.of();
+            if (!txns.isEmpty()) {
+                boolean allRetryableExceptions = true;
+
+                for (int i = 0; i < res.size(); i++) {
+                    CompletableFuture<?> fut0 = res.get(i);
+                    if (fut0.isCompletedExceptionally()) {
+                        try {
+                            fut0.join();
+                        } catch (CompletionException e) {
+                            allRetryableExceptions = 
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);
                         }
+                    }
+                    txns.get(i).rollbackAsync();
+                }
 
-                        return in;
-                    });
+                if (err != null) {
+                    // Check if we can retry.
+                    long nowRelative = System.nanoTime();
+                    if (allRetryableExceptions && nowRelative - startTs[0] < 
DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS) {
+                        startTs[0] = nowRelative;
+                        txns.clear(); // This collection is re-filled on next 
map attempt.
+
+                        mapAndRetry(mapFun, keys, txns, mapped, startTs, 
resFut, log);
+
+                        return null;
+                    }
+
+                    resFut.completeExceptionally(err);
+
+                    return null;
+                }
+
+                waitCommitFuts = unlockFragments(txns, log);
+            } else {
+                if (err != null) {
+                    resFut.completeExceptionally(err);
+
+                    return null;
+                }
+            }
+
+            var in = new ArrayList<E>(Collections.nCopies(keys.size(), null));
+
+            for (int i = 0; i < res.size(); i++) {
+                CompletableFuture<List<E>> f = res.get(i);
+                reduceWithKeepOrder(in, f.getNow(null), 
batches.get(i).originalIndices);
+            }
+
+            if (waitCommitFuts.isEmpty()) {
+                resFut.complete(in);
+            } else {
+                CompletableFutures.allOf(waitCommitFuts).whenComplete((r, e) 
-> {
+                    // Ignore errors.
+                    resFut.complete(in);
                 });
+            }
+
+            return null;
+        });
+    }
+
+    @NotNull
+    private static List<CompletableFuture<Void>> 
unlockFragments(List<Transaction> txns, IgniteLogger log) {
+        List<CompletableFuture<Void>> waitCommitFuts = new ArrayList<>();
+
+        for (Transaction txn : txns) {
+            // ClientTransaction tx0 = (ClientTransaction) txn; TODO FIXME 
investigate error handling
+            ClientLazyTransaction tx0 = (ClientLazyTransaction) txn;
+            CompletableFuture<Void> fut = tx0.commitAsync().whenComplete((r, 
e) -> {

Review Comment:
   Why do you commit transactions here if you have already rolled them back in 
the code above?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -64,7 +64,7 @@ default InternalTransaction 
beginImplicitRo(HybridTimestampTracker timestampTrac
      *
      * @param timestampTracker Observable timestamp tracker is used to track a 
timestamp for either read-write or read-only
      *         transaction execution. The tracker is also used to determine 
the read timestamp for read-only transactions.
-     * @param readOnly {@code true} in order to start a read-only transaction, 
{@code false} in order to start read-write one.
+     * @param readOnly {@code true} in order to start a read snapshot 
transaction, {@code false} in order to start read-write one.

Review Comment:
   We always called it "read-only" before.



##########
modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java:
##########
@@ -220,7 +220,7 @@ public void testEnlistFailAfterCommit() {
 
         tx.commit();
 
-        WriteContext wc = new WriteContext(emptyTracker());
+        WriteContext wc = new WriteContext(emptyTracker(), 1);

Review Comment:
   It might be better to use a constant from `ClientOp`.



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -839,25 +868,202 @@ <E> CompletableFuture<List<E>> split(
                         idx++;
                     }
 
-                    List<CompletableFuture<List<E>>> res = new 
ArrayList<>(aff.size());
-                    List<Batch<E>> batches = new ArrayList<>();
+                    CompletableFuture<List<E>> resFut = new 
CompletableFuture<>();
+                    mapAndRetry(fun, keys, txns, mapped, new long[1], resFut, 
log);
+                    return resFut;
+                });
+    }
+
+    private static <R, E> void mapAndRetry(
+            MapFunction<E, R> mapFun,
+            @Nullable R initialValue, Reducer<R> reducer,
+            List<Transaction> txns,
+            Map<Integer, List<E>> mapped,
+            long[] startTs,
+            CompletableFuture<R> resFut,
+            IgniteLogger log
+    ) {
+        if (startTs[0] == 0) {
+            startTs[0] = System.nanoTime();
+        }
+
+        List<CompletableFuture<R>> res = new ArrayList<>();
+
+        for (Entry<Integer, List<E>> entry : mapped.entrySet()) {
+            res.add(mapFun.apply(entry.getValue(), 
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+        }
+
+        CompletableFutures.allOf(res).handle((ignored, err) -> {
+            List<CompletableFuture<Void>> waitCommitFuts = List.of();
+            if (!txns.isEmpty()) {
+                boolean allRetryableExceptions = true;
+
+                for (int i = 0; i < res.size(); i++) {
+                    CompletableFuture<R> fut0 = res.get(i);
+                    if (fut0.isCompletedExceptionally()) {
+                        try {
+                            fut0.join();
+                        } catch (CompletionException e) {
+                            allRetryableExceptions = 
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);
+                        }
+                    }
+                    Transaction tx0 = txns.get(i);
+                    tx0.rollbackAsync().whenComplete((r, e) -> {
+                        if (e != null) {
+                            log.error("Failed to rollback a transactional 
batch: [tx=" + tx0 + ']', e);
+                        }
+                    });
+                }
+
+                if (err != null) {
+                    // Check if we can retry.
+                    long nowRelative = System.nanoTime();
+                    if (allRetryableExceptions && nowRelative - startTs[0] < 
DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS) {
+                        startTs[0] = nowRelative;
+                        txns.clear(); // This collection is re-filled on next 
map attempt.
+
+                        mapAndRetry(mapFun, initialValue, reducer, txns, 
mapped, startTs, resFut, log);
 
-                    for (Entry<Integer, Batch<E>> entry : mapped.entrySet()) {
-                        res.add(fun.apply(entry.getValue().batch, 
PartitionAwarenessProvider.of(entry.getKey())));
-                        batches.add(entry.getValue());
+                        return null;
                     }
 
-                    return CompletableFuture.allOf(res.toArray(new 
CompletableFuture[0])).thenApply(ignored -> {
-                        var in = new 
ArrayList<E>(Collections.nCopies(keys.size(), null));
+                    resFut.completeExceptionally(err);
+
+                    return null;
+                }
+
+                waitCommitFuts = unlockFragments(txns, log);
+            } else {
+                if (err != null) {
+                    resFut.completeExceptionally(err);
+
+                    return null;
+                }
+            }
+
+            R in = initialValue;
+
+            for (CompletableFuture<R> val : res) {
+                in = reducer.reduce(in, val.getNow(null));
+            }
+
+            if (waitCommitFuts.isEmpty()) {
+                resFut.complete(in);
+            } else {
+                R finalIn = in;
+                CompletableFutures.allOf(waitCommitFuts).whenComplete((r, e) 
-> {
+                    // Ignore errors.
+                    resFut.complete(finalIn);
+                });
+            }
 
-                        for (int i = 0; i < res.size(); i++) {
-                            CompletableFuture<List<E>> f = res.get(i);
-                            reduceWithKeepOrder(in, f.getNow(null), 
batches.get(i).originalIndices);
+            return null;
+        });
+    }
+
+    private static <E> void mapAndRetry(
+            MapFunction<E, List<E>> mapFun,
+            Collection<E> keys,
+            List<Transaction> txns,
+            Map<Integer, Batch<E>> mapped,
+            long[] startTs,
+            CompletableFuture<List<E>> resFut,
+            IgniteLogger log
+    ) {
+        if (startTs[0] == 0) {
+            startTs[0] = System.nanoTime();
+        }
+
+        List<CompletableFuture<List<E>>> res = new ArrayList<>(mapped.size());
+        List<Batch<E>> batches = new ArrayList<>();
+
+        for (Entry<Integer, Batch<E>> entry : mapped.entrySet()) {
+            res.add(mapFun.apply(entry.getValue().batch, 
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+            batches.add(entry.getValue());
+        }
+
+        CompletableFutures.allOf(res).handle((ignored, err) -> {
+            // TODO remove copy paste
+            List<CompletableFuture<Void>> waitCommitFuts = List.of();
+            if (!txns.isEmpty()) {
+                boolean allRetryableExceptions = true;
+
+                for (int i = 0; i < res.size(); i++) {
+                    CompletableFuture<?> fut0 = res.get(i);
+                    if (fut0.isCompletedExceptionally()) {
+                        try {
+                            fut0.join();
+                        } catch (CompletionException e) {
+                            allRetryableExceptions = 
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);

Review Comment:
   `allRetryableExceptions = allRetryableExceptions && 
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);`



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -70,13 +77,17 @@
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.table.partition.PartitionManager;
 import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
  * Client table API implementation.
  */
 public class ClientTable implements Table {
+    private static final long DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS = 
TimeUnit.MILLISECONDS.toNanos(5000);

Review Comment:
   Need to create a ticket (an TODO here) to configure a default timeout on the 
client side.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java:
##########
@@ -855,6 +881,133 @@ void testBatchScenarioWithNoopEnlistmentImplicit() {
         assertEquals(batch.size(), view.removeAll(null, 
batch.keySet()).size());
     }
 
+    @Test
+    void testImplicitDirectMapping() {
+        Map<Partition, ClusterNode> map = 
table().partitionManager().primaryReplicasAsync().join();
+
+        ClientTable table = (ClientTable) table();
+
+        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+        List<Tuple> tuples0 = generateKeysForNode(600, 2, map, 
server0.cluster().localNode(), table);
+        List<Tuple> tuples1 = generateKeysForNode(610, 1, map, 
server1.cluster().localNode(), table);
+
+        assertEquals(2, tuples0.size());
+        assertEquals(1, tuples1.size());
+
+        Map<Tuple, Tuple> batch = new HashMap<>();
+
+        for (Tuple tup : tuples0) {
+            batch.put(tup, val(tup.intValue(0) + ""));
+        }
+
+        for (Tuple tup : tuples1) {
+            batch.put(tup, val(tup.intValue(0) + ""));
+        }
+
+        KeyValueView<Tuple, Tuple> view = table.keyValueView();
+        Transaction tx = client().transactions().begin();
+        view.putAll(tx, batch);
+
+        // Should retry until timeout.
+        CompletableFuture<Map<Tuple, Tuple>> fut = view.getAllAsync(null, 
batch.keySet());
+
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        assertFalse(fut.isDone());
+        tx.commit();
+
+        assertEquals(batch.size(), fut.join().size(), "Implicit tx should be 
retried until timeout");
+
+        // Retry transaction without other locker.
+        assertEquals(batch.size(), view.getAll(null, batch.keySet()).size());
+
+        // Retry expliti transaction.
+        Transaction tx1 = client().transactions().begin();
+        assertEquals(batch.size(), view.getAll(tx1, batch.keySet()).size());
+        tx1.commit();
+
+        // Test if we don't stuck in locks in subsequent rw txn.
+        CompletableFuture.runAsync(() -> {
+            Transaction tx0 = client().transactions().begin();
+            view.put(tx0, tuples0.get(0), val("newval0"));
+            tx0.commit();
+        }).join();
+
+        CompletableFuture.runAsync(() -> {
+            view.put(null, tuples1.get(0), val("newval1"));
+        }).join();
+    }
+
+    @Test
+    void testImplicitRecordDirectMapping() {
+        Map<Partition, ClusterNode> map = 
table().partitionManager().primaryReplicasAsync().join();
+
+        ClientTable table = (ClientTable) table();
+
+        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+        List<Tuple> keys0 = generateKeysForNode(600, 2, map, 
server0.cluster().localNode(), table);
+        List<Tuple> keys1 = generateKeysForNode(610, 1, map, 
server1.cluster().localNode(), table);
+
+        assertEquals(2, keys0.size());
+        assertEquals(1, keys1.size());
+
+        List<Tuple> keys = new ArrayList<>();
+        List<Tuple> recsBatch = new ArrayList<>();
+
+        for (Tuple tup : keys0) {
+            recsBatch.add(kv(tup.intValue(0), tup.intValue(0) + ""));
+            keys.add(tup);
+        }
+
+        for (Tuple tup : keys1) {
+            recsBatch.add(kv(tup.intValue(0), tup.intValue(0) + ""));
+            keys.add(tup);
+        }
+
+        RecordView<Tuple> view = table.recordView();
+        Transaction tx = client().transactions().begin();
+        view.upsertAll(tx, recsBatch);
+
+        // Should retry until timeout.
+        CompletableFuture<List<Tuple>> fut = view.getAllAsync(null, keys);
+
+        try {
+            Thread.sleep(500);

Review Comment:
   The explicit waiting is not required here, because the test takes some time 
between getAll and committing the transaction anyway. 



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java:
##########
@@ -111,26 +112,26 @@ public CompletableFuture<Map<Tuple, Tuple>> 
getAllAsync(@Nullable Transaction tx
             return emptyMapCompletedFuture();
         }
 
-        BiFunction<Collection<Tuple>, PartitionAwarenessProvider, 
CompletableFuture<Map<Tuple, Tuple>>> clo = (batch, provider) -> {
+        List<Transaction> txns = new ArrayList<>();
+
+        MapFunction<Tuple, Map<Tuple, Tuple>> clo = (batch, provider, 
txRequired) -> {
+            Transaction tx0 = tbl.startTxIfNeeded(tx, txns, txRequired);

Review Comment:
   Or even better, add a method (getTransactions()) to MapFunction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to