vldpyatkov commented on code in PR #6779:
URL: https://github.com/apache/ignite-3/pull/6779#discussion_r2524820346
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java:
##########
@@ -74,53 +77,42 @@ public Tuple tuple2() {
return tuple2;
}
- public static CompletableFuture<ClientTupleRequestBase> readAsync(
- ClientMessageUnpacker in,
- IgniteTables tables,
- ClientResourceRegistry resources,
- @Nullable TxManager txManager,
- boolean txReadOnly,
- @Nullable NotificationSender notificationSender,
- @Nullable HybridTimestampTracker tsTracker,
- boolean keyOnly
- ) {
- return readAsync(in, tables, resources, txManager, txReadOnly,
notificationSender, tsTracker, keyOnly, false);
+ public enum RequestOptions {
+ READ_ONLY,
+ KEY_ONLY,
+ READ_SECOND_TUPLE,
+ HAS_PRIORITY
}
public static CompletableFuture<ClientTupleRequestBase> readAsync(
ClientMessageUnpacker in,
IgniteTables tables,
ClientResourceRegistry resources,
- @Nullable TxManager txManager,
- boolean txReadOnly,
+ TxManager txManager,
@Nullable NotificationSender notificationSender,
@Nullable HybridTimestampTracker tsTracker,
- boolean keyOnly,
- boolean readSecondTuple
+ EnumSet<RequestOptions> options
) {
- assert (txManager != null) == (tsTracker != null) : "txManager and
tsTracker must be both null or not null";
Review Comment:
Why can the tracker still be null?
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java:
##########
@@ -111,26 +112,26 @@ public CompletableFuture<Map<Tuple, Tuple>>
getAllAsync(@Nullable Transaction tx
return emptyMapCompletedFuture();
}
- BiFunction<Collection<Tuple>, PartitionAwarenessProvider,
CompletableFuture<Map<Tuple, Tuple>>> clo = (batch, provider) -> {
+ List<Transaction> txns = new ArrayList<>();
+
+ MapFunction<Tuple, Map<Tuple, Tuple>> clo = (batch, provider,
txRequired) -> {
+ Transaction tx0 = tbl.startTxIfNeeded(tx, txns, txRequired);
Review Comment:
It so unclear to maintain an array list (txns) in the other method instead
of doing it in this place.
```
Transaction tx0 = tbl.startTxIfNeeded(tx, txRequired);
txns.add(tx0);
```
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java:
##########
@@ -855,6 +881,133 @@ void testBatchScenarioWithNoopEnlistmentImplicit() {
assertEquals(batch.size(), view.removeAll(null,
batch.keySet()).size());
}
+ @Test
+ void testImplicitDirectMapping() {
+ Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().join();
+
+ ClientTable table = (ClientTable) table();
+
+ IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+ List<Tuple> tuples0 = generateKeysForNode(600, 2, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(610, 1, map,
server1.cluster().localNode(), table);
+
+ assertEquals(2, tuples0.size());
+ assertEquals(1, tuples1.size());
+
+ Map<Tuple, Tuple> batch = new HashMap<>();
+
+ for (Tuple tup : tuples0) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ for (Tuple tup : tuples1) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ KeyValueView<Tuple, Tuple> view = table.keyValueView();
+ Transaction tx = client().transactions().begin();
+ view.putAll(tx, batch);
+
+ // Should retry until timeout.
+ CompletableFuture<Map<Tuple, Tuple>> fut = view.getAllAsync(null,
batch.keySet());
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ assertFalse(fut.isDone());
+ tx.commit();
+
+ assertEquals(batch.size(), fut.join().size(), "Implicit tx should be
retried until timeout");
+
+ // Retry transaction without other locker.
+ assertEquals(batch.size(), view.getAll(null, batch.keySet()).size());
+
+ // Retry expliti transaction.
+ Transaction tx1 = client().transactions().begin();
+ assertEquals(batch.size(), view.getAll(tx1, batch.keySet()).size());
+ tx1.commit();
+
+ // Test if we don't stuck in locks in subsequent rw txn.
+ CompletableFuture.runAsync(() -> {
Review Comment:
Why do you make the async closure if you are waiting for the result
immediately?
Here and next.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java:
##########
@@ -531,6 +531,32 @@ static List<Tuple> generateKeysForNode(
return keys;
}
+ static List<Tuple> generateKeysForPartition(
Review Comment:
The method is used nowhere.
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -839,25 +868,202 @@ <E> CompletableFuture<List<E>> split(
idx++;
}
- List<CompletableFuture<List<E>>> res = new
ArrayList<>(aff.size());
- List<Batch<E>> batches = new ArrayList<>();
+ CompletableFuture<List<E>> resFut = new
CompletableFuture<>();
+ mapAndRetry(fun, keys, txns, mapped, new long[1], resFut,
log);
+ return resFut;
+ });
+ }
+
+ private static <R, E> void mapAndRetry(
+ MapFunction<E, R> mapFun,
+ @Nullable R initialValue, Reducer<R> reducer,
+ List<Transaction> txns,
+ Map<Integer, List<E>> mapped,
+ long[] startTs,
+ CompletableFuture<R> resFut,
+ IgniteLogger log
+ ) {
+ if (startTs[0] == 0) {
+ startTs[0] = System.nanoTime();
+ }
+
+ List<CompletableFuture<R>> res = new ArrayList<>();
+
+ for (Entry<Integer, List<E>> entry : mapped.entrySet()) {
+ res.add(mapFun.apply(entry.getValue(),
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+ }
+
+ CompletableFutures.allOf(res).handle((ignored, err) -> {
+ List<CompletableFuture<Void>> waitCommitFuts = List.of();
+ if (!txns.isEmpty()) {
+ boolean allRetryableExceptions = true;
+
+ for (int i = 0; i < res.size(); i++) {
+ CompletableFuture<R> fut0 = res.get(i);
+ if (fut0.isCompletedExceptionally()) {
+ try {
+ fut0.join();
+ } catch (CompletionException e) {
+ allRetryableExceptions =
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);
Review Comment:
`allRetryableExceptions = allRetryableExceptions &&
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);`
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -839,25 +868,202 @@ <E> CompletableFuture<List<E>> split(
idx++;
}
- List<CompletableFuture<List<E>>> res = new
ArrayList<>(aff.size());
- List<Batch<E>> batches = new ArrayList<>();
+ CompletableFuture<List<E>> resFut = new
CompletableFuture<>();
+ mapAndRetry(fun, keys, txns, mapped, new long[1], resFut,
log);
+ return resFut;
+ });
+ }
+
+ private static <R, E> void mapAndRetry(
+ MapFunction<E, R> mapFun,
+ @Nullable R initialValue, Reducer<R> reducer,
+ List<Transaction> txns,
+ Map<Integer, List<E>> mapped,
+ long[] startTs,
+ CompletableFuture<R> resFut,
+ IgniteLogger log
+ ) {
+ if (startTs[0] == 0) {
+ startTs[0] = System.nanoTime();
+ }
+
+ List<CompletableFuture<R>> res = new ArrayList<>();
+
+ for (Entry<Integer, List<E>> entry : mapped.entrySet()) {
+ res.add(mapFun.apply(entry.getValue(),
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+ }
+
+ CompletableFutures.allOf(res).handle((ignored, err) -> {
+ List<CompletableFuture<Void>> waitCommitFuts = List.of();
+ if (!txns.isEmpty()) {
+ boolean allRetryableExceptions = true;
+
+ for (int i = 0; i < res.size(); i++) {
+ CompletableFuture<R> fut0 = res.get(i);
+ if (fut0.isCompletedExceptionally()) {
+ try {
+ fut0.join();
+ } catch (CompletionException e) {
+ allRetryableExceptions =
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);
+ }
+ }
+ Transaction tx0 = txns.get(i);
+ tx0.rollbackAsync().whenComplete((r, e) -> {
+ if (e != null) {
+ log.error("Failed to rollback a transactional
batch: [tx=" + tx0 + ']', e);
+ }
+ });
+ }
+
+ if (err != null) {
+ // Check if we can retry.
+ long nowRelative = System.nanoTime();
+ if (allRetryableExceptions && nowRelative - startTs[0] <
DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS) {
+ startTs[0] = nowRelative;
+ txns.clear(); // This collection is re-filled on next
map attempt.
+
+ mapAndRetry(mapFun, initialValue, reducer, txns,
mapped, startTs, resFut, log);
- for (Entry<Integer, Batch<E>> entry : mapped.entrySet()) {
- res.add(fun.apply(entry.getValue().batch,
PartitionAwarenessProvider.of(entry.getKey())));
- batches.add(entry.getValue());
+ return null;
}
- return CompletableFuture.allOf(res.toArray(new
CompletableFuture[0])).thenApply(ignored -> {
- var in = new
ArrayList<E>(Collections.nCopies(keys.size(), null));
+ resFut.completeExceptionally(err);
+
+ return null;
+ }
+
+ waitCommitFuts = unlockFragments(txns, log);
+ } else {
+ if (err != null) {
+ resFut.completeExceptionally(err);
+
+ return null;
+ }
+ }
+
+ R in = initialValue;
+
+ for (CompletableFuture<R> val : res) {
+ in = reducer.reduce(in, val.getNow(null));
+ }
+
+ if (waitCommitFuts.isEmpty()) {
+ resFut.complete(in);
+ } else {
+ R finalIn = in;
+ CompletableFutures.allOf(waitCommitFuts).whenComplete((r, e)
-> {
+ // Ignore errors.
+ resFut.complete(finalIn);
+ });
+ }
- for (int i = 0; i < res.size(); i++) {
- CompletableFuture<List<E>> f = res.get(i);
- reduceWithKeepOrder(in, f.getNow(null),
batches.get(i).originalIndices);
+ return null;
+ });
+ }
+
+ private static <E> void mapAndRetry(
+ MapFunction<E, List<E>> mapFun,
+ Collection<E> keys,
+ List<Transaction> txns,
+ Map<Integer, Batch<E>> mapped,
+ long[] startTs,
+ CompletableFuture<List<E>> resFut,
+ IgniteLogger log
+ ) {
+ if (startTs[0] == 0) {
+ startTs[0] = System.nanoTime();
+ }
+
+ List<CompletableFuture<List<E>>> res = new ArrayList<>(mapped.size());
+ List<Batch<E>> batches = new ArrayList<>();
+
+ for (Entry<Integer, Batch<E>> entry : mapped.entrySet()) {
+ res.add(mapFun.apply(entry.getValue().batch,
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+ batches.add(entry.getValue());
+ }
+
+ CompletableFutures.allOf(res).handle((ignored, err) -> {
+ // TODO remove copy paste
+ List<CompletableFuture<Void>> waitCommitFuts = List.of();
+ if (!txns.isEmpty()) {
+ boolean allRetryableExceptions = true;
+
+ for (int i = 0; i < res.size(); i++) {
+ CompletableFuture<?> fut0 = res.get(i);
+ if (fut0.isCompletedExceptionally()) {
+ try {
+ fut0.join();
+ } catch (CompletionException e) {
+ allRetryableExceptions =
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);
}
+ }
+ txns.get(i).rollbackAsync();
+ }
- return in;
- });
+ if (err != null) {
+ // Check if we can retry.
+ long nowRelative = System.nanoTime();
+ if (allRetryableExceptions && nowRelative - startTs[0] <
DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS) {
+ startTs[0] = nowRelative;
+ txns.clear(); // This collection is re-filled on next
map attempt.
+
+ mapAndRetry(mapFun, keys, txns, mapped, startTs,
resFut, log);
+
+ return null;
+ }
+
+ resFut.completeExceptionally(err);
+
+ return null;
+ }
+
+ waitCommitFuts = unlockFragments(txns, log);
+ } else {
+ if (err != null) {
+ resFut.completeExceptionally(err);
+
+ return null;
+ }
+ }
+
+ var in = new ArrayList<E>(Collections.nCopies(keys.size(), null));
+
+ for (int i = 0; i < res.size(); i++) {
+ CompletableFuture<List<E>> f = res.get(i);
+ reduceWithKeepOrder(in, f.getNow(null),
batches.get(i).originalIndices);
+ }
+
+ if (waitCommitFuts.isEmpty()) {
+ resFut.complete(in);
+ } else {
+ CompletableFutures.allOf(waitCommitFuts).whenComplete((r, e)
-> {
+ // Ignore errors.
+ resFut.complete(in);
});
+ }
+
+ return null;
+ });
+ }
+
+ @NotNull
+ private static List<CompletableFuture<Void>>
unlockFragments(List<Transaction> txns, IgniteLogger log) {
+ List<CompletableFuture<Void>> waitCommitFuts = new ArrayList<>();
+
+ for (Transaction txn : txns) {
+ // ClientTransaction tx0 = (ClientTransaction) txn; TODO FIXME
investigate error handling
+ ClientLazyTransaction tx0 = (ClientLazyTransaction) txn;
+ CompletableFuture<Void> fut = tx0.commitAsync().whenComplete((r,
e) -> {
Review Comment:
Why do you commit transactions here if you have already rolled them back in
the code above?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -64,7 +64,7 @@ default InternalTransaction
beginImplicitRo(HybridTimestampTracker timestampTrac
*
* @param timestampTracker Observable timestamp tracker is used to track a
timestamp for either read-write or read-only
* transaction execution. The tracker is also used to determine
the read timestamp for read-only transactions.
- * @param readOnly {@code true} in order to start a read-only transaction,
{@code false} in order to start read-write one.
+ * @param readOnly {@code true} in order to start a read snapshot
transaction, {@code false} in order to start read-write one.
Review Comment:
We always called it "read-only" before.
##########
modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java:
##########
@@ -220,7 +220,7 @@ public void testEnlistFailAfterCommit() {
tx.commit();
- WriteContext wc = new WriteContext(emptyTracker());
+ WriteContext wc = new WriteContext(emptyTracker(), 1);
Review Comment:
It might be better to use a constant from `ClientOp`.
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -839,25 +868,202 @@ <E> CompletableFuture<List<E>> split(
idx++;
}
- List<CompletableFuture<List<E>>> res = new
ArrayList<>(aff.size());
- List<Batch<E>> batches = new ArrayList<>();
+ CompletableFuture<List<E>> resFut = new
CompletableFuture<>();
+ mapAndRetry(fun, keys, txns, mapped, new long[1], resFut,
log);
+ return resFut;
+ });
+ }
+
+ private static <R, E> void mapAndRetry(
+ MapFunction<E, R> mapFun,
+ @Nullable R initialValue, Reducer<R> reducer,
+ List<Transaction> txns,
+ Map<Integer, List<E>> mapped,
+ long[] startTs,
+ CompletableFuture<R> resFut,
+ IgniteLogger log
+ ) {
+ if (startTs[0] == 0) {
+ startTs[0] = System.nanoTime();
+ }
+
+ List<CompletableFuture<R>> res = new ArrayList<>();
+
+ for (Entry<Integer, List<E>> entry : mapped.entrySet()) {
+ res.add(mapFun.apply(entry.getValue(),
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+ }
+
+ CompletableFutures.allOf(res).handle((ignored, err) -> {
+ List<CompletableFuture<Void>> waitCommitFuts = List.of();
+ if (!txns.isEmpty()) {
+ boolean allRetryableExceptions = true;
+
+ for (int i = 0; i < res.size(); i++) {
+ CompletableFuture<R> fut0 = res.get(i);
+ if (fut0.isCompletedExceptionally()) {
+ try {
+ fut0.join();
+ } catch (CompletionException e) {
+ allRetryableExceptions =
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);
+ }
+ }
+ Transaction tx0 = txns.get(i);
+ tx0.rollbackAsync().whenComplete((r, e) -> {
+ if (e != null) {
+ log.error("Failed to rollback a transactional
batch: [tx=" + tx0 + ']', e);
+ }
+ });
+ }
+
+ if (err != null) {
+ // Check if we can retry.
+ long nowRelative = System.nanoTime();
+ if (allRetryableExceptions && nowRelative - startTs[0] <
DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS) {
+ startTs[0] = nowRelative;
+ txns.clear(); // This collection is re-filled on next
map attempt.
+
+ mapAndRetry(mapFun, initialValue, reducer, txns,
mapped, startTs, resFut, log);
- for (Entry<Integer, Batch<E>> entry : mapped.entrySet()) {
- res.add(fun.apply(entry.getValue().batch,
PartitionAwarenessProvider.of(entry.getKey())));
- batches.add(entry.getValue());
+ return null;
}
- return CompletableFuture.allOf(res.toArray(new
CompletableFuture[0])).thenApply(ignored -> {
- var in = new
ArrayList<E>(Collections.nCopies(keys.size(), null));
+ resFut.completeExceptionally(err);
+
+ return null;
+ }
+
+ waitCommitFuts = unlockFragments(txns, log);
+ } else {
+ if (err != null) {
+ resFut.completeExceptionally(err);
+
+ return null;
+ }
+ }
+
+ R in = initialValue;
+
+ for (CompletableFuture<R> val : res) {
+ in = reducer.reduce(in, val.getNow(null));
+ }
+
+ if (waitCommitFuts.isEmpty()) {
+ resFut.complete(in);
+ } else {
+ R finalIn = in;
+ CompletableFutures.allOf(waitCommitFuts).whenComplete((r, e)
-> {
+ // Ignore errors.
+ resFut.complete(finalIn);
+ });
+ }
- for (int i = 0; i < res.size(); i++) {
- CompletableFuture<List<E>> f = res.get(i);
- reduceWithKeepOrder(in, f.getNow(null),
batches.get(i).originalIndices);
+ return null;
+ });
+ }
+
+ private static <E> void mapAndRetry(
+ MapFunction<E, List<E>> mapFun,
+ Collection<E> keys,
+ List<Transaction> txns,
+ Map<Integer, Batch<E>> mapped,
+ long[] startTs,
+ CompletableFuture<List<E>> resFut,
+ IgniteLogger log
+ ) {
+ if (startTs[0] == 0) {
+ startTs[0] = System.nanoTime();
+ }
+
+ List<CompletableFuture<List<E>>> res = new ArrayList<>(mapped.size());
+ List<Batch<E>> batches = new ArrayList<>();
+
+ for (Entry<Integer, Batch<E>> entry : mapped.entrySet()) {
+ res.add(mapFun.apply(entry.getValue().batch,
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+ batches.add(entry.getValue());
+ }
+
+ CompletableFutures.allOf(res).handle((ignored, err) -> {
+ // TODO remove copy paste
+ List<CompletableFuture<Void>> waitCommitFuts = List.of();
+ if (!txns.isEmpty()) {
+ boolean allRetryableExceptions = true;
+
+ for (int i = 0; i < res.size(); i++) {
+ CompletableFuture<?> fut0 = res.get(i);
+ if (fut0.isCompletedExceptionally()) {
+ try {
+ fut0.join();
+ } catch (CompletionException e) {
+ allRetryableExceptions =
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);
Review Comment:
`allRetryableExceptions = allRetryableExceptions &&
ExceptionUtils.matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR);`
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -70,13 +77,17 @@
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.table.partition.PartitionManager;
import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* Client table API implementation.
*/
public class ClientTable implements Table {
+ private static final long DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS =
TimeUnit.MILLISECONDS.toNanos(5000);
Review Comment:
Need to create a ticket (an TODO here) to configure a default timeout on the
client side.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java:
##########
@@ -855,6 +881,133 @@ void testBatchScenarioWithNoopEnlistmentImplicit() {
assertEquals(batch.size(), view.removeAll(null,
batch.keySet()).size());
}
+ @Test
+ void testImplicitDirectMapping() {
+ Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().join();
+
+ ClientTable table = (ClientTable) table();
+
+ IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+ List<Tuple> tuples0 = generateKeysForNode(600, 2, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(610, 1, map,
server1.cluster().localNode(), table);
+
+ assertEquals(2, tuples0.size());
+ assertEquals(1, tuples1.size());
+
+ Map<Tuple, Tuple> batch = new HashMap<>();
+
+ for (Tuple tup : tuples0) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ for (Tuple tup : tuples1) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ KeyValueView<Tuple, Tuple> view = table.keyValueView();
+ Transaction tx = client().transactions().begin();
+ view.putAll(tx, batch);
+
+ // Should retry until timeout.
+ CompletableFuture<Map<Tuple, Tuple>> fut = view.getAllAsync(null,
batch.keySet());
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ assertFalse(fut.isDone());
+ tx.commit();
+
+ assertEquals(batch.size(), fut.join().size(), "Implicit tx should be
retried until timeout");
+
+ // Retry transaction without other locker.
+ assertEquals(batch.size(), view.getAll(null, batch.keySet()).size());
+
+ // Retry expliti transaction.
+ Transaction tx1 = client().transactions().begin();
+ assertEquals(batch.size(), view.getAll(tx1, batch.keySet()).size());
+ tx1.commit();
+
+ // Test if we don't stuck in locks in subsequent rw txn.
+ CompletableFuture.runAsync(() -> {
+ Transaction tx0 = client().transactions().begin();
+ view.put(tx0, tuples0.get(0), val("newval0"));
+ tx0.commit();
+ }).join();
+
+ CompletableFuture.runAsync(() -> {
+ view.put(null, tuples1.get(0), val("newval1"));
+ }).join();
+ }
+
+ @Test
+ void testImplicitRecordDirectMapping() {
+ Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().join();
+
+ ClientTable table = (ClientTable) table();
+
+ IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+ List<Tuple> keys0 = generateKeysForNode(600, 2, map,
server0.cluster().localNode(), table);
+ List<Tuple> keys1 = generateKeysForNode(610, 1, map,
server1.cluster().localNode(), table);
+
+ assertEquals(2, keys0.size());
+ assertEquals(1, keys1.size());
+
+ List<Tuple> keys = new ArrayList<>();
+ List<Tuple> recsBatch = new ArrayList<>();
+
+ for (Tuple tup : keys0) {
+ recsBatch.add(kv(tup.intValue(0), tup.intValue(0) + ""));
+ keys.add(tup);
+ }
+
+ for (Tuple tup : keys1) {
+ recsBatch.add(kv(tup.intValue(0), tup.intValue(0) + ""));
+ keys.add(tup);
+ }
+
+ RecordView<Tuple> view = table.recordView();
+ Transaction tx = client().transactions().begin();
+ view.upsertAll(tx, recsBatch);
+
+ // Should retry until timeout.
+ CompletableFuture<List<Tuple>> fut = view.getAllAsync(null, keys);
+
+ try {
+ Thread.sleep(500);
Review Comment:
The explicit waiting is not required here, because the test takes some time
between getAll and committing the transaction anyway.
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java:
##########
@@ -111,26 +112,26 @@ public CompletableFuture<Map<Tuple, Tuple>>
getAllAsync(@Nullable Transaction tx
return emptyMapCompletedFuture();
}
- BiFunction<Collection<Tuple>, PartitionAwarenessProvider,
CompletableFuture<Map<Tuple, Tuple>>> clo = (batch, provider) -> {
+ List<Transaction> txns = new ArrayList<>();
+
+ MapFunction<Tuple, Map<Tuple, Tuple>> clo = (batch, provider,
txRequired) -> {
+ Transaction tx0 = tbl.startTxIfNeeded(tx, txns, txRequired);
Review Comment:
Or even better, add a method (getTransactions()) to MapFunction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]