This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f6e8d70601a IGNITE-27948 Enable client-side retries in
runInTransaction (#7672)
f6e8d70601a is described below
commit f6e8d70601a1c91ea95c329327b70d02453f3372
Author: Aleksei Scherbakov <[email protected]>
AuthorDate: Fri Feb 27 11:28:35 2026 +0300
IGNITE-27948 Enable client-side retries in runInTransaction (#7672)
---
.../internal/client/proto/ErrorExtensions.java | 2 +
.../{ErrorExtensions.java => tx/ErrorFlags.java} | 37 +++-
.../handler/ClientInboundMessageHandler.java | 56 ++++--
.../ClientRetriableTransactionException.java} | 19 +-
.../ignite/internal/client/TcpClientChannel.java | 11 +-
.../tx/ClientTransactionKilledException.java | 3 +-
.../internal/tx/ItClientRunInTransactionTest.java | 72 +++++++
.../ignite/internal/tx/ItRunInTransactionTest.java | 208 +++++++++++++++++++++
8 files changed, 370 insertions(+), 38 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
index 3ddd836ecea..de99dda7b21 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
@@ -28,4 +28,6 @@ public class ErrorExtensions {
public static final String DELAYED_ACK = "delayed-ack";
public static final String TX_KILL = "tx-kill";
+
+ public static final String FLAGS = "flags";
}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ErrorFlags.java
similarity index 53%
copy from
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
copy to
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ErrorFlags.java
index 3ddd836ecea..8f303625514 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ErrorFlags.java
@@ -15,17 +15,40 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.proto;
+package org.apache.ignite.internal.client.proto.tx;
+
+import java.util.EnumSet;
/**
- * Error data extensions. When the server returns an error response, it may
contain additional data in a map. Keys are defined here.
+ * Error flags.
*/
-public class ErrorExtensions {
- public static final String EXPECTED_SCHEMA_VERSION = "expected-schema-ver";
+public enum ErrorFlags {
+ RETRIABLE(1);
+
+ private final int mask;
- public static final String SQL_UPDATE_COUNTERS = "sql-update-counters";
+ ErrorFlags(int mask) {
+ this.mask = mask;
+ }
- public static final String DELAYED_ACK = "delayed-ack";
+ public int mask() {
+ return mask;
+ }
- public static final String TX_KILL = "tx-kill";
+ /**
+ * Unpack flags.
+ *
+ * @param mask Packed mask.
+ * @return Set of flags.
+ */
+ public static EnumSet<ErrorFlags> unpack(int mask) {
+ EnumSet<ErrorFlags> result = EnumSet.noneOf(
+ ErrorFlags.class);
+ for (ErrorFlags flag : values()) {
+ if ((mask & flag.mask()) != 0) {
+ result.add(flag);
+ }
+ }
+ return result;
+ }
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 036719498db..8a87e4ee458 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -138,6 +138,7 @@ import
org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
import org.apache.ignite.internal.client.proto.ServerOp;
import org.apache.ignite.internal.client.proto.ServerOpResponseFlags;
+import org.apache.ignite.internal.client.proto.tx.ErrorFlags;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import
org.apache.ignite.internal.compute.executor.platform.PlatformComputeConnection;
@@ -184,6 +185,7 @@ import org.apache.ignite.network.IgniteCluster;
import org.apache.ignite.security.AuthenticationType;
import
org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
import org.apache.ignite.sql.SqlBatchException;
+import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -700,16 +702,28 @@ public class ClientInboundMessageHandler
}
private void writeErrorCore(Throwable err, ClientMessagePacker packer) {
+ int extCnt = 0;
+ boolean retriable = false;
+
SchemaVersionMismatchException schemaVersionMismatchException =
findException(err, SchemaVersionMismatchException.class);
SqlBatchException sqlBatchException = findException(err,
SqlBatchException.class);
DelayedAckException delayedAckException = findException(err,
DelayedAckException.class);
- TransactionKilledException transactionKilledException =
findException(err, TransactionKilledException.class);
+ TransactionKilledException killedException = findException(err,
TransactionKilledException.class);
+
+ if (schemaVersionMismatchException != null || sqlBatchException !=
null || delayedAckException != null || killedException != null) {
+ extCnt = 1;
+ } else {
+ retriable = findException(err,
RetriableTransactionException.class) != null;
+ if (retriable) {
+ extCnt++;
+ }
+ }
err = firstNotNull(
schemaVersionMismatchException,
sqlBatchException,
delayedAckException,
- transactionKilledException,
+ killedException,
ExceptionUtils.unwrapCause(err)
);
@@ -739,22 +753,28 @@ public class ClientInboundMessageHandler
}
// Extensions.
- if (schemaVersionMismatchException != null) {
- packer.packInt(1); // 1 extension.
- packer.packString(ErrorExtensions.EXPECTED_SCHEMA_VERSION);
- packer.packInt(schemaVersionMismatchException.expectedVersion());
- } else if (sqlBatchException != null) {
- packer.packInt(1); // 1 extension.
- packer.packString(ErrorExtensions.SQL_UPDATE_COUNTERS);
- packer.packLongArray(sqlBatchException.updateCounters());
- } else if (delayedAckException != null) {
- packer.packInt(1); // 1 extension.
- packer.packString(ErrorExtensions.DELAYED_ACK);
- packer.packUuid(delayedAckException.txId());
- } else if (transactionKilledException != null) {
- packer.packInt(1); // 1 extension.
- packer.packString(ErrorExtensions.TX_KILL);
- packer.packUuid(transactionKilledException.txId());
+ if (extCnt > 0) {
+ packer.packInt(extCnt);
+
+ if (retriable) {
+ packer.packString(ErrorExtensions.FLAGS);
+ packer.packInt(ErrorFlags.RETRIABLE.mask());
+ }
+
+ if (schemaVersionMismatchException != null) {
+ packer.packString(ErrorExtensions.EXPECTED_SCHEMA_VERSION);
+
packer.packInt(schemaVersionMismatchException.expectedVersion());
+ } else if (sqlBatchException != null) {
+ // TODO IGNITE-28012 SQL_UPDATE_COUNTERS is an array and must
come last
+ packer.packString(ErrorExtensions.SQL_UPDATE_COUNTERS);
+ packer.packLongArray(sqlBatchException.updateCounters());
+ } else if (delayedAckException != null) {
+ packer.packString(ErrorExtensions.DELAYED_ACK);
+ packer.packUuid(delayedAckException.txId());
+ } else if (killedException != null) {
+ packer.packString(ErrorExtensions.TX_KILL);
+ packer.packUuid(killedException.txId());
+ }
} else {
packer.packNil(); // No extensions.
}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientRetriableTransactionException.java
similarity index 62%
copy from
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
copy to
modules/client/src/main/java/org/apache/ignite/internal/client/ClientRetriableTransactionException.java
index 3ddd836ecea..dfea7601a62 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientRetriableTransactionException.java
@@ -15,17 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.proto;
+package org.apache.ignite.internal.client;
+
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.tx.RetriableTransactionException;
/**
- * Error data extensions. When the server returns an error response, it may
contain additional data in a map. Keys are defined here.
+ * Wraps client exception cause for retry purposes, which is based on marker
interface RetriableTransactionException.
*/
-public class ErrorExtensions {
- public static final String EXPECTED_SCHEMA_VERSION = "expected-schema-ver";
-
- public static final String SQL_UPDATE_COUNTERS = "sql-update-counters";
-
- public static final String DELAYED_ACK = "delayed-ack";
-
- public static final String TX_KILL = "tx-kill";
+class ClientRetriableTransactionException extends IgniteException implements
RetriableTransactionException {
+ public ClientRetriableTransactionException(int code, Throwable cause) {
+ super(code, cause);
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 938b19a4e5a..cd4512d8330 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.client.proto.ResponseFlags.getErrorFlag;
import static
org.apache.ignite.internal.util.ExceptionUtils.copyExceptionWithCause;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
@@ -65,6 +66,7 @@ import org.apache.ignite.internal.client.proto.HandshakeUtils;
import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
+import org.apache.ignite.internal.client.proto.tx.ErrorFlags;
import org.apache.ignite.internal.client.tx.ClientTransaction;
import org.apache.ignite.internal.client.tx.ClientTransactionKilledException;
import org.apache.ignite.internal.future.timeout.TimeoutObject;
@@ -549,7 +551,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
handlePartitionAssignmentChange(flags, unpacker);
handleObservableTimestamp(unpacker);
- Throwable err = ResponseFlags.getErrorFlag(flags) ?
readError(unpacker) : null;
+ Throwable err = getErrorFlag(flags) ? readError(unpacker) : null;
if (ResponseFlags.getNotificationFlag(flags)) {
handleNotification(resId, unpacker, err);
@@ -640,6 +642,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
var errClassName = unpacker.unpackString();
var errMsg = unpacker.tryUnpackNil() ? null : unpacker.unpackString();
+ boolean retriable = false;
IgniteException causeWithStackTrace = unpacker.tryUnpackNil() ? null :
new IgniteException(traceId, code, unpacker.unpackString());
@@ -658,6 +661,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
return new ClientDelayedAckException(traceId, code, errMsg,
unpacker.unpackUuid(), causeWithStackTrace);
} else if (key.equals(ErrorExtensions.TX_KILL)) {
return new ClientTransactionKilledException(traceId, code,
errMsg, unpacker.unpackUuid(), causeWithStackTrace);
+ } else if (key.equals(ErrorExtensions.FLAGS)) {
+ EnumSet<ErrorFlags> flags =
ErrorFlags.unpack(unpacker.unpackInt());
+ retriable = flags.contains(ErrorFlags.RETRIABLE);
} else {
// Unknown extension - ignore.
unpacker.skipValues(1);
@@ -675,7 +681,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
try {
Class<? extends Throwable> errCls = (Class<? extends Throwable>)
Class.forName(errClassName);
- return copyExceptionWithCause(errCls, traceId, code, errMsg,
causeWithStackTrace);
+ return copyExceptionWithCause(errCls, traceId, code, errMsg,
+ retriable ? new ClientRetriableTransactionException(code,
causeWithStackTrace) : causeWithStackTrace);
} catch (ClassNotFoundException ignored) {
// Ignore: incompatible exception class. Fall back to generic
exception.
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
index bbbe3eee7fe..a79bc88d2d8 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
@@ -18,13 +18,14 @@
package org.apache.ignite.internal.client.tx;
import java.util.UUID;
+import org.apache.ignite.tx.RetriableTransactionException;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
/**
* Reports a killed transaction.
*/
-public class ClientTransactionKilledException extends TransactionException {
+public class ClientTransactionKilledException extends TransactionException
implements RetriableTransactionException {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItClientRunInTransactionTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItClientRunInTransactionTest.java
new file mode 100644
index 00000000000..48c5de29aec
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItClientRunInTransactionTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static java.lang.String.format;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.client.sql.ClientSql;
+import org.apache.ignite.internal.client.sql.PartitionMappingProvider;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+class ItClientRunInTransactionTest extends ItRunInTransactionTest {
+ private IgniteClient client;
+
+ @BeforeEach
+ void startClient() {
+ client = IgniteClient.builder()
+ .addresses("localhost:" +
unwrapIgniteImpl(cluster.aliveNode()).clientAddress().port())
+ .build();
+ }
+
+ @AfterEach
+ void closeClient() {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ Ignite ignite() {
+ return client;
+ }
+
+ @Override
+ void initAwareness(Table table) {
+ Tuple key0 = key(0);
+ ClientSql sql = (ClientSql) ignite().sql();
+ sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)",
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+ key0.intValue(0), key0.intValue(0) + "");
+
+
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready);
+ }
+
+ @Override
+ UUID txId(Transaction tx) {
+ ClientLazyTransaction tx0 = ClientLazyTransaction.get(tx);
+ return tx0.startedTx().txId();
+ }
+}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItRunInTransactionTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItRunInTransactionTest.java
new file mode 100644
index 00000000000..3390f82d51c
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItRunInTransactionTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static java.lang.String.format;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.argumentSet;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.lang.IgniteTriFunction;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Retry scenarios in runInTransaction.
+ */
+public class ItRunInTransactionTest extends ClusterPerTestIntegrationTest {
+ protected static final String TABLE_NAME = "TEST";
+
+ protected static final String COLUMN_KEY = "ID";
+
+ protected static final String COLUMN_VAL = "VAl";
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ Ignite ignite() {
+ return node(0);
+ }
+
+ UUID txId(Transaction tx) {
+ return ((InternalTransaction) tx).id();
+ }
+
+ private Table createTestTable() {
+ ignite().sql().executeScript(
+ "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (" + COLUMN_KEY
+ " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR)");
+
+ return ignite().tables().table(TABLE_NAME);
+ }
+
+ void initAwareness(Table table) {
+ table.partitionDistribution().primaryReplicasAsync().join();
+ }
+
+ @ParameterizedTest
+ @MethodSource("syncTestContextFactory")
+ public void testSync(SyncTestContext ctx) {
+ Table table = createTestTable();
+ initAwareness(table); // Makes test behavior determined.
+
+ Transaction olderTx = ignite().transactions().begin();
+
+ Tuple key = key(1);
+ Tuple key2 = key(2);
+
+ ctx.put.apply(ignite(), olderTx, key);
+
+ AtomicInteger cnt = new AtomicInteger();
+
+ CompletableFuture<Void> fut = IgniteTestUtils.runAsync(() -> {
+ ignite().transactions().runInTransaction(youngerTx -> {
+ if (cnt.incrementAndGet() == 2) {
+ throw new RuntimeException("retry");
+ }
+
+ ctx.put.apply(ignite(), youngerTx, key2);
+ assertTrue(txId(olderTx).compareTo(txId(youngerTx)) < 0);
+ // Younger is not allowed to wait for older.
+ ctx.put.apply(ignite(), youngerTx, key);
+ });
+ });
+
+ assertThat(fut, willThrowWithCauseOrSuppressed(Exception.class,
"retry"));
+
+ assertEquals(2, cnt.get(), "Should retry at least once");
+ }
+
+ @ParameterizedTest
+ @MethodSource("asyncTestContextFactory")
+ public void testAsync(AsyncTestContext ctx) {
+ Table table = createTestTable();
+ initAwareness(table); // Makes test behavior determined.
+
+ Transaction olderTx = ignite().transactions().begin();
+
+ Tuple key = key(1);
+ Tuple key2 = key(2);
+
+ ctx.put.apply(ignite(), olderTx, key).join();
+
+ AtomicInteger cnt = new AtomicInteger();
+
+ CompletableFuture<Void> fut =
ignite().transactions().runInTransactionAsync(youngerTx -> {
+ if (cnt.incrementAndGet() == 2) {
+ throw new RuntimeException("retry");
+ }
+
+ return ctx.put.apply(ignite(), youngerTx, key2).thenCompose(r -> {
+ assertTrue(txId(olderTx).compareTo(txId(youngerTx)) < 0,
+ "Wrong ordering: old=" + olderTx.toString() + ", new="
+ youngerTx.toString());
+ // Younger is not allowed to wait for older.
+ return ctx.put.apply(ignite(), youngerTx, key);
+ });
+ });
+
+ assertThat(fut, willThrowWithCauseOrSuppressed(Exception.class,
"retry"));
+
+ assertEquals(2, cnt.get(), "Should retry at least once");
+ }
+
+ protected static Tuple val(String v) {
+ return Tuple.create().set(COLUMN_VAL, v);
+ }
+
+ protected static Tuple key(Integer k) {
+ return Tuple.create().set(COLUMN_KEY, k);
+ }
+
+ private static Stream<Arguments> syncTestContextFactory() {
+ return Stream.of(
+ argumentSet("kv", new
SyncTestContext(ItRunInTransactionTest::putKv)),
+ argumentSet("sql", new
SyncTestContext(ItRunInTransactionTest::putSql))
+ );
+ }
+
+ private static Stream<Arguments> asyncTestContextFactory() {
+ return Stream.of(
+ argumentSet("kv", new
AsyncTestContext(ItRunInTransactionTest::putKvAsync)),
+ argumentSet("sql", new
AsyncTestContext(ItRunInTransactionTest::putSqlAsync))
+ );
+ }
+
+ /**
+ * Sync test context.
+ */
+ protected static class SyncTestContext {
+ final IgniteTriFunction<Ignite, Transaction, Tuple, Void> put;
+
+ SyncTestContext(IgniteTriFunction<Ignite, Transaction, Tuple, Void>
put) {
+ this.put = put;
+ }
+ }
+
+ /**
+ * Async test context.
+ */
+ protected static class AsyncTestContext {
+ final IgniteTriFunction<Ignite, Transaction, Tuple,
CompletableFuture<Void>> put;
+
+ AsyncTestContext(IgniteTriFunction<Ignite, Transaction, Tuple,
CompletableFuture<Void>> put) {
+ this.put = put;
+ }
+ }
+
+ private static CompletableFuture<Void> putSqlAsync(Ignite client,
Transaction tx, Tuple key) {
+ return client.sql()
+ .executeAsync(tx, format("INSERT INTO %s (%s, %s) VALUES (?,
?)", TABLE_NAME, COLUMN_KEY, COLUMN_VAL), key.intValue(0),
+ key.intValue(0) + "").thenApply(r -> null);
+ }
+
+ private static Void putKv(Ignite client, Transaction tx, Tuple key) {
+ client.tables().tables().get(0).keyValueView().put(tx, key,
val(key.intValue(0) + ""));
+ return null;
+ }
+
+ private static Void putSql(Ignite client, @Nullable Transaction tx, Tuple
key) {
+ client.sql()
+ .execute(tx, format("INSERT INTO %s (%s, %s) VALUES (?, ?)",
TABLE_NAME, COLUMN_KEY, COLUMN_VAL), key.intValue(0),
+ key.intValue(0) + "");
+ return null;
+ }
+
+ private static CompletableFuture<Void> putKvAsync(Ignite client,
Transaction tx, Tuple key) {
+ return client.tables().tables().get(0).keyValueView().putAsync(tx,
key, val(key.intValue(0) + ""));
+ }
+}