This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f6e8d70601a IGNITE-27948 Enable client-side retries in 
runInTransaction (#7672)
f6e8d70601a is described below

commit f6e8d70601a1c91ea95c329327b70d02453f3372
Author: Aleksei Scherbakov <[email protected]>
AuthorDate: Fri Feb 27 11:28:35 2026 +0300

    IGNITE-27948 Enable client-side retries in runInTransaction (#7672)
---
 .../internal/client/proto/ErrorExtensions.java     |   2 +
 .../{ErrorExtensions.java => tx/ErrorFlags.java}   |  37 +++-
 .../handler/ClientInboundMessageHandler.java       |  56 ++++--
 .../ClientRetriableTransactionException.java}      |  19 +-
 .../ignite/internal/client/TcpClientChannel.java   |  11 +-
 .../tx/ClientTransactionKilledException.java       |   3 +-
 .../internal/tx/ItClientRunInTransactionTest.java  |  72 +++++++
 .../ignite/internal/tx/ItRunInTransactionTest.java | 208 +++++++++++++++++++++
 8 files changed, 370 insertions(+), 38 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
index 3ddd836ecea..de99dda7b21 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
@@ -28,4 +28,6 @@ public class ErrorExtensions {
     public static final String DELAYED_ACK = "delayed-ack";
 
     public static final String TX_KILL = "tx-kill";
+
+    public static final String FLAGS = "flags";
 }
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ErrorFlags.java
similarity index 53%
copy from 
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
copy to 
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ErrorFlags.java
index 3ddd836ecea..8f303625514 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ErrorFlags.java
@@ -15,17 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.proto;
+package org.apache.ignite.internal.client.proto.tx;
+
+import java.util.EnumSet;
 
 /**
- * Error data extensions. When the server returns an error response, it may 
contain additional data in a map. Keys are defined here.
+ * Error flags.
  */
-public class ErrorExtensions {
-    public static final String EXPECTED_SCHEMA_VERSION = "expected-schema-ver";
+public enum ErrorFlags {
+    RETRIABLE(1);
+
+    private final int mask;
 
-    public static final String SQL_UPDATE_COUNTERS = "sql-update-counters";
+    ErrorFlags(int mask) {
+        this.mask = mask;
+    }
 
-    public static final String DELAYED_ACK = "delayed-ack";
+    public int mask() {
+        return mask;
+    }
 
-    public static final String TX_KILL = "tx-kill";
+    /**
+     * Unpack flags.
+     *
+     * @param mask Packed mask.
+     * @return Set of flags.
+     */
+    public static EnumSet<ErrorFlags> unpack(int mask) {
+        EnumSet<ErrorFlags> result = EnumSet.noneOf(
+                ErrorFlags.class);
+        for (ErrorFlags flag : values()) {
+            if ((mask & flag.mask()) != 0) {
+                result.add(flag);
+            }
+        }
+        return result;
+    }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 036719498db..8a87e4ee458 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -138,6 +138,7 @@ import 
org.apache.ignite.internal.client.proto.ProtocolVersion;
 import org.apache.ignite.internal.client.proto.ResponseFlags;
 import org.apache.ignite.internal.client.proto.ServerOp;
 import org.apache.ignite.internal.client.proto.ServerOpResponseFlags;
+import org.apache.ignite.internal.client.proto.tx.ErrorFlags;
 import org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import 
org.apache.ignite.internal.compute.executor.platform.PlatformComputeConnection;
@@ -184,6 +185,7 @@ import org.apache.ignite.network.IgniteCluster;
 import org.apache.ignite.security.AuthenticationType;
 import 
org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
 import org.apache.ignite.sql.SqlBatchException;
+import org.apache.ignite.tx.RetriableTransactionException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -700,16 +702,28 @@ public class ClientInboundMessageHandler
     }
 
     private void writeErrorCore(Throwable err, ClientMessagePacker packer) {
+        int extCnt = 0;
+        boolean retriable = false;
+
         SchemaVersionMismatchException schemaVersionMismatchException = 
findException(err, SchemaVersionMismatchException.class);
         SqlBatchException sqlBatchException = findException(err, 
SqlBatchException.class);
         DelayedAckException delayedAckException = findException(err, 
DelayedAckException.class);
-        TransactionKilledException transactionKilledException = 
findException(err, TransactionKilledException.class);
+        TransactionKilledException killedException = findException(err, 
TransactionKilledException.class);
+
+        if (schemaVersionMismatchException != null || sqlBatchException != 
null || delayedAckException != null || killedException != null) {
+            extCnt = 1;
+        } else {
+            retriable = findException(err, 
RetriableTransactionException.class) != null;
+            if (retriable) {
+                extCnt++;
+            }
+        }
 
         err = firstNotNull(
                 schemaVersionMismatchException,
                 sqlBatchException,
                 delayedAckException,
-                transactionKilledException,
+                killedException,
                 ExceptionUtils.unwrapCause(err)
         );
 
@@ -739,22 +753,28 @@ public class ClientInboundMessageHandler
         }
 
         // Extensions.
-        if (schemaVersionMismatchException != null) {
-            packer.packInt(1); // 1 extension.
-            packer.packString(ErrorExtensions.EXPECTED_SCHEMA_VERSION);
-            packer.packInt(schemaVersionMismatchException.expectedVersion());
-        } else if (sqlBatchException != null) {
-            packer.packInt(1); // 1 extension.
-            packer.packString(ErrorExtensions.SQL_UPDATE_COUNTERS);
-            packer.packLongArray(sqlBatchException.updateCounters());
-        } else if (delayedAckException != null) {
-            packer.packInt(1); // 1 extension.
-            packer.packString(ErrorExtensions.DELAYED_ACK);
-            packer.packUuid(delayedAckException.txId());
-        } else if (transactionKilledException != null) {
-            packer.packInt(1); // 1 extension.
-            packer.packString(ErrorExtensions.TX_KILL);
-            packer.packUuid(transactionKilledException.txId());
+        if (extCnt > 0) {
+            packer.packInt(extCnt);
+
+            if (retriable) {
+                packer.packString(ErrorExtensions.FLAGS);
+                packer.packInt(ErrorFlags.RETRIABLE.mask());
+            }
+
+            if (schemaVersionMismatchException != null) {
+                packer.packString(ErrorExtensions.EXPECTED_SCHEMA_VERSION);
+                
packer.packInt(schemaVersionMismatchException.expectedVersion());
+            } else if (sqlBatchException != null) {
+                // TODO IGNITE-28012 SQL_UPDATE_COUNTERS is an array and must 
come last
+                packer.packString(ErrorExtensions.SQL_UPDATE_COUNTERS);
+                packer.packLongArray(sqlBatchException.updateCounters());
+            } else if (delayedAckException != null) {
+                packer.packString(ErrorExtensions.DELAYED_ACK);
+                packer.packUuid(delayedAckException.txId());
+            } else if (killedException != null) {
+                packer.packString(ErrorExtensions.TX_KILL);
+                packer.packUuid(killedException.txId());
+            }
         } else {
             packer.packNil(); // No extensions.
         }
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientRetriableTransactionException.java
similarity index 62%
copy from 
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
copy to 
modules/client/src/main/java/org/apache/ignite/internal/client/ClientRetriableTransactionException.java
index 3ddd836ecea..dfea7601a62 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientRetriableTransactionException.java
@@ -15,17 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.proto;
+package org.apache.ignite.internal.client;
+
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.tx.RetriableTransactionException;
 
 /**
- * Error data extensions. When the server returns an error response, it may 
contain additional data in a map. Keys are defined here.
+ * Wraps client exception cause for retry purposes, which is based on marker 
interface RetriableTransactionException.
  */
-public class ErrorExtensions {
-    public static final String EXPECTED_SCHEMA_VERSION = "expected-schema-ver";
-
-    public static final String SQL_UPDATE_COUNTERS = "sql-update-counters";
-
-    public static final String DELAYED_ACK = "delayed-ack";
-
-    public static final String TX_KILL = "tx-kill";
+class ClientRetriableTransactionException extends IgniteException implements 
RetriableTransactionException {
+    public ClientRetriableTransactionException(int code, Throwable cause) {
+        super(code, cause);
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 938b19a4e5a..cd4512d8330 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.client.proto.ResponseFlags.getErrorFlag;
 import static 
org.apache.ignite.internal.util.ExceptionUtils.copyExceptionWithCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
@@ -65,6 +66,7 @@ import org.apache.ignite.internal.client.proto.HandshakeUtils;
 import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
 import org.apache.ignite.internal.client.proto.ResponseFlags;
+import org.apache.ignite.internal.client.proto.tx.ErrorFlags;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
 import org.apache.ignite.internal.client.tx.ClientTransactionKilledException;
 import org.apache.ignite.internal.future.timeout.TimeoutObject;
@@ -549,7 +551,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         handlePartitionAssignmentChange(flags, unpacker);
         handleObservableTimestamp(unpacker);
 
-        Throwable err = ResponseFlags.getErrorFlag(flags) ? 
readError(unpacker) : null;
+        Throwable err = getErrorFlag(flags) ? readError(unpacker) : null;
 
         if (ResponseFlags.getNotificationFlag(flags)) {
             handleNotification(resId, unpacker, err);
@@ -640,6 +642,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
         var errClassName = unpacker.unpackString();
         var errMsg = unpacker.tryUnpackNil() ? null : unpacker.unpackString();
+        boolean retriable = false;
 
         IgniteException causeWithStackTrace = unpacker.tryUnpackNil() ? null : 
new IgniteException(traceId, code, unpacker.unpackString());
 
@@ -658,6 +661,9 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 return new ClientDelayedAckException(traceId, code, errMsg, 
unpacker.unpackUuid(), causeWithStackTrace);
             } else if (key.equals(ErrorExtensions.TX_KILL)) {
                 return new ClientTransactionKilledException(traceId, code, 
errMsg, unpacker.unpackUuid(), causeWithStackTrace);
+            } else if (key.equals(ErrorExtensions.FLAGS)) {
+                EnumSet<ErrorFlags> flags = 
ErrorFlags.unpack(unpacker.unpackInt());
+                retriable = flags.contains(ErrorFlags.RETRIABLE);
             } else {
                 // Unknown extension - ignore.
                 unpacker.skipValues(1);
@@ -675,7 +681,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
         try {
             Class<? extends Throwable> errCls = (Class<? extends Throwable>) 
Class.forName(errClassName);
-            return copyExceptionWithCause(errCls, traceId, code, errMsg, 
causeWithStackTrace);
+            return copyExceptionWithCause(errCls, traceId, code, errMsg,
+                    retriable ? new ClientRetriableTransactionException(code, 
causeWithStackTrace) : causeWithStackTrace);
         } catch (ClassNotFoundException ignored) {
             // Ignore: incompatible exception class. Fall back to generic 
exception.
         }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
index bbbe3eee7fe..a79bc88d2d8 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
@@ -18,13 +18,14 @@
 package org.apache.ignite.internal.client.tx;
 
 import java.util.UUID;
+import org.apache.ignite.tx.RetriableTransactionException;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Reports a killed transaction.
  */
-public class ClientTransactionKilledException extends TransactionException {
+public class ClientTransactionKilledException extends TransactionException 
implements RetriableTransactionException {
     /** Serial version uid. */
     private static final long serialVersionUID = 0L;
 
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItClientRunInTransactionTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItClientRunInTransactionTest.java
new file mode 100644
index 00000000000..48c5de29aec
--- /dev/null
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItClientRunInTransactionTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static java.lang.String.format;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.client.sql.ClientSql;
+import org.apache.ignite.internal.client.sql.PartitionMappingProvider;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+class ItClientRunInTransactionTest extends ItRunInTransactionTest {
+    private IgniteClient client;
+
+    @BeforeEach
+    void startClient() {
+        client = IgniteClient.builder()
+                .addresses("localhost:" + 
unwrapIgniteImpl(cluster.aliveNode()).clientAddress().port())
+                .build();
+    }
+
+    @AfterEach
+    void closeClient() {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Override
+    Ignite ignite() {
+        return client;
+    }
+
+    @Override
+    void initAwareness(Table table) {
+        Tuple key0 = key(0);
+        ClientSql sql = (ClientSql) ignite().sql();
+        sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)", 
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+                key0.intValue(0), key0.intValue(0) + "");
+
+        
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready);
+    }
+
+    @Override
+    UUID txId(Transaction tx) {
+        ClientLazyTransaction tx0 = ClientLazyTransaction.get(tx);
+        return tx0.startedTx().txId();
+    }
+}
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItRunInTransactionTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItRunInTransactionTest.java
new file mode 100644
index 00000000000..3390f82d51c
--- /dev/null
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItRunInTransactionTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static java.lang.String.format;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.argumentSet;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.lang.IgniteTriFunction;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Retry scenarios in runInTransaction.
+ */
+public class ItRunInTransactionTest extends ClusterPerTestIntegrationTest {
+    protected static final String TABLE_NAME = "TEST";
+
+    protected static final String COLUMN_KEY = "ID";
+
+    protected static final String COLUMN_VAL = "VAl";
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    Ignite ignite() {
+        return node(0);
+    }
+
+    UUID txId(Transaction tx) {
+        return ((InternalTransaction) tx).id();
+    }
+
+    private Table createTestTable() {
+        ignite().sql().executeScript(
+                "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (" + COLUMN_KEY 
+ " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR)");
+
+        return ignite().tables().table(TABLE_NAME);
+    }
+
+    void initAwareness(Table table) {
+        table.partitionDistribution().primaryReplicasAsync().join();
+    }
+
+    @ParameterizedTest
+    @MethodSource("syncTestContextFactory")
+    public void testSync(SyncTestContext ctx) {
+        Table table = createTestTable();
+        initAwareness(table); // Makes test behavior determined.
+
+        Transaction olderTx = ignite().transactions().begin();
+
+        Tuple key = key(1);
+        Tuple key2 = key(2);
+
+        ctx.put.apply(ignite(), olderTx, key);
+
+        AtomicInteger cnt = new AtomicInteger();
+
+        CompletableFuture<Void> fut = IgniteTestUtils.runAsync(() -> {
+            ignite().transactions().runInTransaction(youngerTx -> {
+                if (cnt.incrementAndGet() == 2) {
+                    throw new RuntimeException("retry");
+                }
+
+                ctx.put.apply(ignite(), youngerTx, key2);
+                assertTrue(txId(olderTx).compareTo(txId(youngerTx)) < 0);
+                // Younger is not allowed to wait for older.
+                ctx.put.apply(ignite(), youngerTx, key);
+            });
+        });
+
+        assertThat(fut, willThrowWithCauseOrSuppressed(Exception.class, 
"retry"));
+
+        assertEquals(2, cnt.get(), "Should retry at least once");
+    }
+
+    @ParameterizedTest
+    @MethodSource("asyncTestContextFactory")
+    public void testAsync(AsyncTestContext ctx) {
+        Table table = createTestTable();
+        initAwareness(table); // Makes test behavior determined.
+
+        Transaction olderTx = ignite().transactions().begin();
+
+        Tuple key = key(1);
+        Tuple key2 = key(2);
+
+        ctx.put.apply(ignite(), olderTx, key).join();
+
+        AtomicInteger cnt = new AtomicInteger();
+
+        CompletableFuture<Void> fut = 
ignite().transactions().runInTransactionAsync(youngerTx -> {
+            if (cnt.incrementAndGet() == 2) {
+                throw new RuntimeException("retry");
+            }
+
+            return ctx.put.apply(ignite(), youngerTx, key2).thenCompose(r -> {
+                assertTrue(txId(olderTx).compareTo(txId(youngerTx)) < 0,
+                        "Wrong ordering: old=" + olderTx.toString() + ", new=" 
+ youngerTx.toString());
+                // Younger is not allowed to wait for older.
+                return ctx.put.apply(ignite(), youngerTx, key);
+            });
+        });
+
+        assertThat(fut, willThrowWithCauseOrSuppressed(Exception.class, 
"retry"));
+
+        assertEquals(2, cnt.get(), "Should retry at least once");
+    }
+
+    protected static Tuple val(String v) {
+        return Tuple.create().set(COLUMN_VAL, v);
+    }
+
+    protected static Tuple key(Integer k) {
+        return Tuple.create().set(COLUMN_KEY, k);
+    }
+
+    private static Stream<Arguments> syncTestContextFactory() {
+        return Stream.of(
+                argumentSet("kv", new 
SyncTestContext(ItRunInTransactionTest::putKv)),
+                argumentSet("sql", new 
SyncTestContext(ItRunInTransactionTest::putSql))
+        );
+    }
+
+    private static Stream<Arguments> asyncTestContextFactory() {
+        return Stream.of(
+                argumentSet("kv", new 
AsyncTestContext(ItRunInTransactionTest::putKvAsync)),
+                argumentSet("sql", new 
AsyncTestContext(ItRunInTransactionTest::putSqlAsync))
+        );
+    }
+
+    /**
+     * Sync test context.
+     */
+    protected static class SyncTestContext {
+        final IgniteTriFunction<Ignite, Transaction, Tuple, Void> put;
+
+        SyncTestContext(IgniteTriFunction<Ignite, Transaction, Tuple, Void> 
put) {
+            this.put = put;
+        }
+    }
+
+    /**
+     * Async test context.
+     */
+    protected static class AsyncTestContext {
+        final IgniteTriFunction<Ignite, Transaction, Tuple, 
CompletableFuture<Void>> put;
+
+        AsyncTestContext(IgniteTriFunction<Ignite, Transaction, Tuple, 
CompletableFuture<Void>> put) {
+            this.put = put;
+        }
+    }
+
+    private static CompletableFuture<Void> putSqlAsync(Ignite client, 
Transaction tx, Tuple key) {
+        return client.sql()
+                .executeAsync(tx, format("INSERT INTO %s (%s, %s) VALUES (?, 
?)", TABLE_NAME, COLUMN_KEY, COLUMN_VAL), key.intValue(0),
+                        key.intValue(0) + "").thenApply(r -> null);
+    }
+
+    private static Void putKv(Ignite client, Transaction tx, Tuple key) {
+        client.tables().tables().get(0).keyValueView().put(tx, key, 
val(key.intValue(0) + ""));
+        return null;
+    }
+
+    private static Void putSql(Ignite client, @Nullable Transaction tx, Tuple 
key) {
+        client.sql()
+                .execute(tx, format("INSERT INTO %s (%s, %s) VALUES (?, ?)", 
TABLE_NAME, COLUMN_KEY, COLUMN_VAL), key.intValue(0),
+                        key.intValue(0) + "");
+        return null;
+    }
+
+    private static CompletableFuture<Void> putKvAsync(Ignite client, 
Transaction tx, Tuple key) {
+        return client.tables().tables().get(0).keyValueView().putAsync(tx, 
key, val(key.intValue(0) + ""));
+    }
+}

Reply via email to