ibessonov commented on code in PR #7799:
URL: https://github.com/apache/ignite-3/pull/7799#discussion_r3039906279


##########
modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java:
##########
@@ -1614,7 +1630,7 @@ public int hashCode() {
     private static Stream<Arguments> killTestContextFactory() {
         return Stream.of(
                 argumentSet("kv", new 
KillTestContext(TransactionException.class, 
ItThinClientTransactionsTest::putKv)),
-                argumentSet("sql", new KillTestContext(SqlException.class, 
ItThinClientTransactionsTest::putSql))
+                argumentSet("sql", new KillTestContext(IgniteException.class, 
ItThinClientTransactionsTest::putSql))

Review Comment:
   Can we somehow check a more specific type here? `IgniteException` is too 
broad for an integration test



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDataConsistencyTest.java:
##########
@@ -112,18 +122,31 @@ public void testDataConsistency() throws 
InterruptedException {
             readThreads[i].start();
         }
 
+        log.info("Started {} writers", WRITE_PARALLELISM);
+        log.info("Started {} readers", READ_PARALLELISM);
+
         long cur = System.currentTimeMillis();
 
+        long curOps = ops.sum();
+
         while (cur + DURATION_MILLIS > System.currentTimeMillis()) {
             Thread.sleep(1000);
 
-            log.info("Waiting...");
+            long tmp = ops.sum();
+            if (tmp == curOps) {
+                throw new AssertionError("Test doesn't make progress");
+            }

Review Comment:
   Should be `assertNotEquals` instead



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDataConsistencyTest.java:
##########
@@ -50,19 +58,20 @@ public class ItDataConsistencyTest extends 
ClusterPerClassIntegrationTest {
     private static final String ZONE_NAME = "test_zone";
     private static final String TABLE_NAME = "accounts";
     private static final int WRITE_PARALLELISM = 
Runtime.getRuntime().availableProcessors();
-    private static final int READ_PARALLELISM = 1;
+    private static final int READ_PARALLELISM = 0;
     private static final int ACCOUNTS_COUNT = WRITE_PARALLELISM * 10;
     private static final double INITIAL = 1000;
     private static final double TOTAL = ACCOUNTS_COUNT * INITIAL;
-    private static final int DURATION_MILLIS = 10000;
+    private static final int DURATION_MILLIS = 30000;
 
     private CyclicBarrier startBar = new CyclicBarrier(WRITE_PARALLELISM + 
READ_PARALLELISM, () -> log.info("Before test"));
     private LongAdder ops = new LongAdder();
     private LongAdder fails = new LongAdder();
+    private LongAdder restarts = new LongAdder();
     private LongAdder readOps = new LongAdder();
     private LongAdder readFails = new LongAdder();
     private AtomicBoolean stop = new AtomicBoolean();
-    private Random rng = new Random();
+    private Random rng = new Random(0);

Review Comment:
   I feel like having a random seed is a better thing. If we need a 
reproducibility then we should choose the seed in `setUp` and print it into a 
log, instead of having it as 0 all the time.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {

Review Comment:
   Can we move this class, and maybe some other related changes, into a new PR? 
I feel like it just complicates an already large enough bulk of changes



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -1031,13 +1035,35 @@ public void processDelayedAck(Object ignored, @Nullable 
Throwable err) {
 
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
-        var deadlockPreventionPolicy = new 
DeadlockPreventionPolicyImpl(DEFAULT_TX_ID_COMPARATOR, DEFAULT_LOCK_TIMEOUT);
+        var deadlockPreventionPolicy = new WoundWaitDeadlockPreventionPolicy() 
{

Review Comment:
   I believe that this class should not be anonymous.
   
   Are there plans to make `DEFAULT_LOCK_TIMEOUT` configurable somehow? It is 
`0` now (as far as I see), and we can't even test any alternative



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Predicate;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+    /** Hint for maximum concurrent txns. */
+    private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+    /** Field updater for inflights. */
+    private static final AtomicLongFieldUpdater<CleanupContext> UPDATER = 
newUpdater(CleanupContext.class, "inflights");
+
+    /** Txn contexts. */
+    private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new 
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
+
+    /**
+     * Registers the inflight for a transaction.
+     *
+     * @param txId The transaction id.
+     * @param testPred Test predicate.

Review Comment:
   What exactly is it testing? I can't understand that from the javadoc, and 
the implementation is not too clear either.
   
   A documentation for the entire class would be welcomed. For example, it's 
not clear to me what the nullability of `finishFut` means and why 
`removeInflight` has no data race. There are some implicit invariants that are 
hard to see from the code



##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java:
##########
@@ -150,4 +169,1220 @@ public void 
testCompatibilityLockMapSizePropertyNameWasNotChanged() {
                 LOCK_MAP_SIZE_PROPERTY_NAME
         );
     }
+
+    @Test
+    public void testSingleKeyWrite() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, X);
+
+        assertTrue(fut0.isDone());
+
+        Collection<UUID> queue = lockManager.queue(key);
+
+        assertTrue(queue.size() == 1 && queue.iterator().next().equals(txId1));
+
+        Waiter waiter = lockManager.waiter(key, txId1);
+
+        assertTrue(waiter.locked());
+
+        lockManager.release(fut0.join());
+    }
+
+    @Test
+    public void testSingleKeyWriteLock() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId2, key, X);
+
+        assertTrue(fut0.isDone());
+
+        assertTrue(txId2.compareTo(txId1) > 0);
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, X);
+
+        assertFalse(fut1.isDone());
+
+        assertTrue(lockManager.waiter(key, txId2).locked());
+        assertFalse(lockManager.waiter(key, txId1).locked());
+
+        lockManager.release(fut0.join());
+
+        assertTrue(fut1.isDone());
+
+        assertNull(lockManager.waiter(key, txId2));
+        assertTrue(lockManager.waiter(key, txId1).locked());
+
+        lockManager.release(fut1.join());
+
+        assertNull(lockManager.waiter(key, txId2));
+        assertNull(lockManager.waiter(key, txId1));
+    }
+
+    @Test
+    public void downgradeLockOutOfTurnTest() {
+        UUID txId0 = TestTransactionIds.newTransactionId();
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        lockManager.acquire(txId0, key, S).join();
+        Lock lock = lockManager.acquire(txId2, key, S).join();
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+        assertFalse(fut0.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, X);
+        expectConflict(fut2);
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
+        fut1.join();
+
+        assertFalse(fut0.isDone());
+
+        lockManager.release(lock);
+        fut0.thenAccept(l -> lockManager.release(l));
+    }
+
+    @Test
+    public void upgradeLockImmediatelyTest() {
+        UUID txId0 = TestTransactionIds.newTransactionId();
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        CompletableFuture<Lock> fut = lockManager.acquire(txId0, key, IS);
+        assertTrue(fut.isDone());
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, IS);
+        assertTrue(fut0.isDone());
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId2, key, IS);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, key, IX);
+        assertTrue(fut2.isDone());
+
+        lockManager.release(fut1.join());
+    }
+
+    @Test
+    public void testSingleKeyReadWriteLock() {
+        UUID txId0 = TestTransactionIds.newTransactionId();
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+        UUID txId3 = TestTransactionIds.newTransactionId();
+        assertTrue(txId3.compareTo(txId2) > 0);
+        assertTrue(txId2.compareTo(txId1) > 0);
+        assertTrue(txId1.compareTo(txId0) > 0);
+        LockKey key = lockKey();
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(txId3, key, S);
+        assertTrue(fut3.isDone());
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, S);
+        assertTrue(fut2.isDone());
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+        assertFalse(fut0.isDone());
+
+        assertTrue(lockManager.waiter(key, txId3).locked());
+        assertTrue(lockManager.waiter(key, txId2).locked());
+        assertTrue(lockManager.waiter(key, txId1).locked());
+        assertFalse(lockManager.waiter(key, txId0).locked());
+
+        lockManager.release(fut1.join());
+
+        assertTrue(lockManager.waiter(key, txId3).locked());
+        assertTrue(lockManager.waiter(key, txId2).locked());
+        assertNull(lockManager.waiter(key, txId1));
+        assertFalse(lockManager.waiter(key, txId0).locked());
+
+        lockManager.release(fut3.join());
+
+        assertNull(lockManager.waiter(key, txId3));
+        assertTrue(lockManager.waiter(key, txId2).locked());
+        assertNull(lockManager.waiter(key, txId1));
+        assertFalse(lockManager.waiter(key, txId0).locked());
+
+        lockManager.release(fut2.join());
+
+        assertNull(lockManager.waiter(key, txId3));
+        assertNull(lockManager.waiter(key, txId2));
+        assertNull(lockManager.waiter(key, txId1));
+        assertTrue(lockManager.waiter(key, txId0).locked());
+    }
+
+    @Test
+    public void testSingleKeyReadWriteConflict() {
+        UUID txId0 = TestTransactionIds.newTransactionId();
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        LockKey key = lockKey();
+
+        // Lock in order
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, S);
+        assertTrue(fut0.isDone());
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId0, key, X);
+        assertFalse(fut1.isDone());
+
+        lockManager.release(fut0.join());
+        assertTrue(fut1.isDone());
+
+        lockManager.release(fut1.join());
+
+        assertTrue(lockManager.queue(key).isEmpty());
+
+        // Lock not in order
+        fut0 = lockManager.acquire(txId0, key, S);
+        assertTrue(fut0.isDone());
+
+        try {
+            lockManager.acquire(txId1, key, X).join();
+
+            fail();
+        } catch (CompletionException e) {
+            // Expected.
+        }

Review Comment:
   ```suggestion
           assertThat(lockManager.acquire(txId1, key, X), 
CompletableFutureExceptionMatcher.willThrow(CompletionException.class));
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -144,6 +155,28 @@ private static int intProperty(SystemLocalConfiguration 
systemProperties, String
         return property == null ? defaultValue : 
Integer.parseInt(property.propertyValue());
     }
 
+    private Exception resolveTransactionSealedException(UUID txId) {
+        TxStateMeta meta = txStateVolatileStorage.state(txId);
+        Throwable cause = meta == null ? null : meta.lastException();
+        boolean isFinishedDueToTimeout = meta != null && 
meta.isFinishedDueToTimeoutOrFalse();
+        boolean isFinishedDueToError = meta != null && !isFinishedDueToTimeout 
&& meta.lastExceptionErrorCode() != null;
+        Throwable publicCause = isFinishedDueToError ? cause : null;
+        Integer causeErrorCode = meta == null ? null : 
meta.lastExceptionErrorCode();
+
+        return new TransactionException(
+                finishedTransactionErrorCode(isFinishedDueToTimeout, 
isFinishedDueToError),
+                format("{} [{}, txState={}].",
+                        finishedTransactionErrorMessage(

Review Comment:
   As a separate change, could we please have a 
`finishedTransactionErrorMessage` that accepts `TxStateMeta`, for example? I 
see the code of calculating the arguments being duplicated over and over again, 
which is a clear antipattern. A TODO with a Jira link would be enough, please 
don't fix it in this PR



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2981,6 +2962,7 @@ private CompletableFuture<ReplicaResult> 
processSingleEntryAction(ReadWriteSingl
                 });
             }
             case RW_UPSERT: {
+                // TODO IGNITE-28450

Review Comment:
   Please add a short message that explains the issue



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -1072,45 +1187,44 @@ private List<WaiterImpl> release(UUID txId) {
          *
          * @return List of waiters to notify.
          */
-        private List<WaiterImpl> unlockCompatibleWaiters() {
-            if (!deadlockPreventionPolicy.usePriority() && 
deadlockPreventionPolicy.waitTimeout() == 0) {
+        private List<Runnable> unlockCompatibleWaiters() {
+            if (waiters.isEmpty()) {
                 return emptyList();
             }
 
-            ArrayList<WaiterImpl> toNotify = new ArrayList<>();
-            Set<UUID> toFail = new HashSet<>();
+            ArrayList<Runnable> toNotify = new ArrayList<>();

Review Comment:
   ```suggestion
               List<Runnable> toNotify = new ArrayList<>();
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -996,15 +1111,15 @@ private boolean isWaiterReadyToNotify(WaiterImpl waiter, 
boolean skipFail) {
          */
         @Override
         public boolean tryRelease(UUID txId) {
-            Collection<WaiterImpl> toNotify;
+            Collection<Runnable> toNotify;

Review Comment:
   Just a note - a dedicated interface instead of `Runnable` would make code 
navigation simpler. It would be trivial to find all its implementations, while 
current code doesn't allow to do that



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+    private static final int BASE_PORT = 3344;
+    private static final int BASE_CLIENT_PORT = 10942;
+    private static final int BASE_REST_PORT = 10300;
+
+    private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+    protected static Ignite publicIgnite;
+    protected static IgniteImpl igniteImpl;
+
+    public static void main(String[] args) throws Exception {
+        TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
+        runner.startCluster();
+    }
+
+    public IgniteImpl node(int idx) {
+        return unwrapIgniteImpl(igniteServers.get(idx).api());
+    }
+
+    private void startCluster() throws Exception {
+        Path workDir = workDir();
+
+        String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+        @Language("HOCON")
+        String configTemplate = "ignite {\n"
+                + "  \"network\": {\n"
+                + "    \"port\":{},\n"
+                + "    \"nodeFinder\":{\n"
+                + "      \"netClusterNodes\": [ {} ]\n"

Review Comment:
   ```suggestion
                   + "  network: {\n"
                   + "    port:{},\n"
                   + "    nodeFinder:{\n"
                   + "      netClusterNodes: [ {} ]\n"
   ```



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+    private static final int BASE_PORT = 3344;
+    private static final int BASE_CLIENT_PORT = 10942;
+    private static final int BASE_REST_PORT = 10300;
+
+    private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+    protected static Ignite publicIgnite;
+    protected static IgniteImpl igniteImpl;
+
+    public static void main(String[] args) throws Exception {
+        TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();

Review Comment:
   Is it possible to reuse the code from `org.apache.ignite.internal.Cluster` 
or other test classes? I feel like there's a lot of copy-paste going on here



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+    private static final int BASE_PORT = 3344;
+    private static final int BASE_CLIENT_PORT = 10942;
+    private static final int BASE_REST_PORT = 10300;
+
+    private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+    protected static Ignite publicIgnite;
+    protected static IgniteImpl igniteImpl;
+
+    public static void main(String[] args) throws Exception {
+        TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
+        runner.startCluster();
+    }
+
+    public IgniteImpl node(int idx) {
+        return unwrapIgniteImpl(igniteServers.get(idx).api());
+    }
+
+    private void startCluster() throws Exception {
+        Path workDir = workDir();
+
+        String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+        @Language("HOCON")
+        String configTemplate = "ignite {\n"
+                + "  \"network\": {\n"
+                + "    \"port\":{},\n"
+                + "    \"nodeFinder\":{\n"
+                + "      \"netClusterNodes\": [ {} ]\n"
+                + "    }\n"
+                + "  },\n"
+                + "  storage.profiles: {"
+                + "        " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+                + "        " + DEFAULT_STORAGE_PROFILE + ".sizeBytes: " + 
pageMemorySize() + " "
+                + "  },\n"
+                + "  clientConnector: { port:{} },\n"
+                + "  clientConnector.sendServerExceptionStackTraceToClient: 
true\n"
+                + "  rest.port: {},\n"
+                + "  raft.fsync = " + fsync() + ",\n"
+                + "  system.partitionsLogPath = \"" + logPath() + "\",\n"
+                + "  failureHandler.handler: {\n"
+                + "      type: \"" + 
StopNodeOrHaltFailureHandlerConfigurationSchema.TYPE + "\",\n"

Review Comment:
   How "safe" is it to use "halt" failure handler in `integrationTest`? Maybe 
"stop node" is enough?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDataConsistencyTest.java:
##########
@@ -174,6 +199,16 @@ private void validate() {
         }
 
         assertEquals(TOTAL, total0, "Total amount invariant is not preserved");
+
+        for (int i = 0; i < initialNodes(); i++) {
+            IgniteImpl ignite = unwrapIgniteImpl(node(i));
+            try {
+                await("node " + i + " should release all locks").atMost(3, 
TimeUnit.SECONDS)
+                        .until(() -> 
ignite.txManager().lockManager().isEmpty());
+            } catch (ConditionTimeoutException e) {
+                throw e;
+            }

Review Comment:
   This is clearly an artifact of some active development. Please remove this 
`try` block



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java:
##########
@@ -203,6 +203,10 @@ public void testImplicitTransactionRetry() {
 
     @Test
     public void testImplicitTransactionTimeout() {
+        if (!txManager(accounts).lockManager().policy().reverse()) {
+            return; // This test scenario is only applicable to WaitDie.
+        }

Review Comment:
   ```suggestion
           assumeFalse(txManager(accounts).lockManager().policy().reverse(), 
"This test scenario is only applicable to WaitDie.");
   ```



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java:
##########
@@ -105,6 +105,11 @@ protected void 
customizeInitParameters(InitParametersBuilder builder) {
 
     @Test
     public void testLockExceptionRetry() {
+        IgniteImpl ignite = node0();
+        if (!ignite.txManager().lockManager().policy().reverse()) {
+            return; // This test scenario is only applicable to reversed 
policy.
+        }

Review Comment:
   ```suggestion
           assumeFalse(ignite.txManager().lockManager().policy().reverse(), " 
This test scenario is only applicable to reversed policy.");
   ```



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDataConsistencyTest.java:
##########
@@ -112,18 +122,31 @@ public void testDataConsistency() throws 
InterruptedException {
             readThreads[i].start();
         }
 
+        log.info("Started {} writers", WRITE_PARALLELISM);
+        log.info("Started {} readers", READ_PARALLELISM);
+
         long cur = System.currentTimeMillis();
 
+        long curOps = ops.sum();
+
         while (cur + DURATION_MILLIS > System.currentTimeMillis()) {
             Thread.sleep(1000);

Review Comment:
   I know it's not your code, but `1000` is too much for a pause, can we make 
it smaller please?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+    private static final int BASE_PORT = 3344;
+    private static final int BASE_CLIENT_PORT = 10942;
+    private static final int BASE_REST_PORT = 10300;
+
+    private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+    protected static Ignite publicIgnite;
+    protected static IgniteImpl igniteImpl;
+
+    public static void main(String[] args) throws Exception {
+        TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
+        runner.startCluster();
+    }
+
+    public IgniteImpl node(int idx) {
+        return unwrapIgniteImpl(igniteServers.get(idx).api());
+    }
+
+    private void startCluster() throws Exception {
+        Path workDir = workDir();
+
+        String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+        @Language("HOCON")
+        String configTemplate = "ignite {\n"
+                + "  \"network\": {\n"
+                + "    \"port\":{},\n"
+                + "    \"nodeFinder\":{\n"
+                + "      \"netClusterNodes\": [ {} ]\n"
+                + "    }\n"
+                + "  },\n"
+                + "  storage.profiles: {"
+                + "        " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+                + "        " + DEFAULT_STORAGE_PROFILE + ".sizeBytes: " + 
pageMemorySize() + " "
+                + "  },\n"
+                + "  clientConnector: { port:{} },\n"
+                + "  clientConnector.sendServerExceptionStackTraceToClient: 
true\n"
+                + "  rest.port: {},\n"
+                + "  raft.fsync = " + fsync() + ",\n"
+                + "  system.partitionsLogPath = \"" + logPath() + "\",\n"
+                + "  failureHandler.handler: {\n"
+                + "      type: \"" + 
StopNodeOrHaltFailureHandlerConfigurationSchema.TYPE + "\",\n"
+                + "      tryStop: true,\n"
+                + "      timeoutMillis: 60000,\n" // 1 minute for graceful 
shutdown
+                + "  },\n"
+                + "}";
+
+        for (int i = 0; i < nodes(); i++) {
+            int port = BASE_PORT + i;
+            String nodeName = nodeName(port);
+
+            String config = IgniteStringFormatter.format(configTemplate, port, 
connectNodeAddr,
+                    BASE_CLIENT_PORT + i, BASE_REST_PORT + i);
+
+            
igniteServers.add(TestIgnitionManager.startWithProductionDefaults(nodeName, 
config, workDir.resolve(nodeName)));
+        }
+
+        String metaStorageNodeName = nodeName(BASE_PORT);
+
+        InitParameters initParameters = InitParameters.builder()
+                .metaStorageNodeNames(metaStorageNodeName)
+                .clusterName("cluster")
+                .clusterConfiguration(clusterConfiguration())
+                .build();
+
+        TestIgnitionManager.init(igniteServers.get(0), initParameters);
+
+        for (IgniteServer node : igniteServers) {
+            assertThat(node.waitForInitAsync(), willCompleteSuccessfully());
+
+            if (publicIgnite == null) {
+                publicIgnite = node.api();
+                igniteImpl = unwrapIgniteImpl(publicIgnite);
+            }
+        }
+    }
+
+    @Nullable
+    protected String clusterConfiguration() {
+        return "ignite {}";
+    }
+
+    protected static String nodeName(int port) {
+        return "node_" + port;
+    }
+
+    protected Path workDir() throws Exception {
+        return new File("c:/work/tpcc").toPath();
+    }
+
+    protected int pageMemorySize() {
+        return 2073741824;

Review Comment:
   ```suggestion
           return 2 * Constants.GiB;
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/Lock.java:
##########
@@ -72,6 +72,23 @@ public UUID txId() {
         return txId;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Lock lock = (Lock) o;
+        return Objects.equals(lockKey, lock.lockKey) && lockMode == 
lock.lockMode && Objects.equals(txId, lock.txId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(lockKey, lockMode, txId);

Review Comment:
   Please use explicit implementation. Lock hash is a performance-sensitive 
thing, and `Objects.hash` introduces unintentional overhead while allocating 
the vararg array.



##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedWaitDieDeadlockPreventionTest.java:
##########
@@ -77,10 +64,16 @@ public void youngNormalTxShouldBeAborted() {
         var tx1 = beginTx(TxPriority.LOW);
         var tx2 = beginTx(TxPriority.NORMAL);
 
-        var key1 = key("test");
+        var key1 = lockKey("test");
 
         assertThat(xlock(tx1, key1), willSucceedFast());
 
         assertThat(xlock(tx2, key1), willThrow(LockException.class));
     }
+
+    @Override
+    @Test
+    public void testNonFair() {
+        super.testNonFair();
+    }

Review Comment:
   I don't think that I understand what's happening here. Is the test disabled 
in a superclass? Maybe it should be commented



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WaitDieDeadlockPreventionPolicy.java:
##########
@@ -46,4 +47,19 @@ public Comparator<UUID> txIdComparator() {
     public long waitTimeout() {
         return 0;
     }
+
+    @Override
+    public final Waiter allowWait(Waiter waiter, Waiter owner) {
+        int res = txIdComparator().compare(waiter.txId(), owner.txId());

Review Comment:
   I have the same comment as before here, it also applies to 
`WoundWaitDeadlockPreventionPolicy`. Does it make sense, what do you think?



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java:
##########
@@ -227,6 +232,11 @@ public void 
retryImplicitTransactionsDueToReplicaMissTest() {
 
     @Test
     public void retryAfterLockFailureInSameTransaction() {
+        IgniteImpl ignite = node0();
+        if (!ignite.txManager().lockManager().policy().reverse()) {
+            return; // This test scenario is only applicable to reversed 
policy.
+        }

Review Comment:
   ```suggestion
           assumeFalse(ignite.txManager().lockManager().policy().reverse(), " 
This test scenario is only applicable to reversed policy.");
   ```



##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java:
##########
@@ -150,4 +169,1220 @@ public void 
testCompatibilityLockMapSizePropertyNameWasNotChanged() {
                 LOCK_MAP_SIZE_PROPERTY_NAME
         );
     }
+
+    @Test
+    public void testSingleKeyWrite() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, X);
+
+        assertTrue(fut0.isDone());
+
+        Collection<UUID> queue = lockManager.queue(key);
+
+        assertTrue(queue.size() == 1 && queue.iterator().next().equals(txId1));
+
+        Waiter waiter = lockManager.waiter(key, txId1);
+
+        assertTrue(waiter.locked());
+
+        lockManager.release(fut0.join());
+    }
+
+    @Test
+    public void testSingleKeyWriteLock() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId2, key, X);
+
+        assertTrue(fut0.isDone());
+
+        assertTrue(txId2.compareTo(txId1) > 0);
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, X);
+
+        assertFalse(fut1.isDone());
+
+        assertTrue(lockManager.waiter(key, txId2).locked());
+        assertFalse(lockManager.waiter(key, txId1).locked());
+
+        lockManager.release(fut0.join());
+
+        assertTrue(fut1.isDone());
+
+        assertNull(lockManager.waiter(key, txId2));
+        assertTrue(lockManager.waiter(key, txId1).locked());
+
+        lockManager.release(fut1.join());
+
+        assertNull(lockManager.waiter(key, txId2));
+        assertNull(lockManager.waiter(key, txId1));
+    }
+
+    @Test
+    public void downgradeLockOutOfTurnTest() {
+        UUID txId0 = TestTransactionIds.newTransactionId();
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        lockManager.acquire(txId0, key, S).join();
+        Lock lock = lockManager.acquire(txId2, key, S).join();
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+        assertFalse(fut0.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, X);
+        expectConflict(fut2);
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
+        fut1.join();
+
+        assertFalse(fut0.isDone());
+
+        lockManager.release(lock);
+        fut0.thenAccept(l -> lockManager.release(l));

Review Comment:
   We should probably wait for this future, and then assert that "everything's 
alright". Test doesn't look complete



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java:
##########
@@ -88,6 +89,14 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        return S.toString(LockKey.class, this, "ctx", contextId, "key", key);
+        return S.toString(LockKey.class, this, "ctx", contextId, "key", 
dump(key));
+    }
+
+    private static String dump(Object key) {
+        if (key instanceof ByteBuffer) {
+            return Arrays.toString(((ByteBuffer) key).array());

Review Comment:
   We do have a 
`org.apache.ignite.internal.util.IgniteUtils#byteBufferToByteArray` already, 
but it allocates new array. Maybe we need to introduce a more efficient 
`toString` variant in a separate issue. This code should reuse existing methods 
to avoid copy-pasting the code.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+    private static final int BASE_PORT = 3344;
+    private static final int BASE_CLIENT_PORT = 10800;
+    private static final int BASE_REST_PORT = 10300;
+
+    private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+    protected static Ignite publicIgnite;
+    protected static IgniteImpl igniteImpl;
+
+    public static void main(String[] args) throws Exception {
+        TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
+        runner.startCluster();
+    }
+
+    public IgniteImpl node(int idx) {
+        return unwrapIgniteImpl(igniteServers.get(idx).api());
+    }
+
+    private void startCluster() throws Exception {
+        Path workDir = workDir();
+
+        String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+        @Language("HOCON")
+        String configTemplate = "ignite {\n"
+                + "  \"network\": {\n"
+                + "    \"port\":{},\n"
+                + "    \"nodeFinder\":{\n"
+                + "      \"netClusterNodes\": [ {} ]\n"
+                + "    }\n"
+                + "  },\n"
+                + "  storage.profiles: {"
+                + "        " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+                + "        " + DEFAULT_STORAGE_PROFILE + ".sizeBytes: " + 
pageMemorySize() + " "
+                + "  },\n"
+                + "  clientConnector: { port:{} },\n"
+                + "  clientConnector.sendServerExceptionStackTraceToClient: 
true\n"
+                + "  rest.port: {},\n"
+                + "  raft.fsync = " + fsync() + ",\n"
+                + "  system.partitionsLogPath = \"" + logPath() + "\",\n"
+                + "  failureHandler.handler: {\n"
+                + "      type: \"" + 
StopNodeOrHaltFailureHandlerConfigurationSchema.TYPE + "\",\n"
+                + "      tryStop: true,\n"
+                + "      timeoutMillis: 60000,\n" // 1 minute for graceful 
shutdown
+                + "  },\n"
+                + "}";
+
+        for (int i = 0; i < nodes(); i++) {
+            int port = BASE_PORT + i;
+            String nodeName = nodeName(port);
+
+            String config = IgniteStringFormatter.format(configTemplate, port, 
connectNodeAddr,
+                    BASE_CLIENT_PORT + i, BASE_REST_PORT + i);
+
+            
igniteServers.add(TestIgnitionManager.startWithProductionDefaults(nodeName, 
config, workDir.resolve(nodeName)));
+        }
+
+        String metaStorageNodeName = nodeName(BASE_PORT);
+
+        InitParameters initParameters = InitParameters.builder()
+                .metaStorageNodeNames(metaStorageNodeName)
+                .clusterName("cluster")
+                .clusterConfiguration(clusterConfiguration())
+                .build();
+
+        TestIgnitionManager.init(igniteServers.get(0), initParameters);
+
+        for (IgniteServer node : igniteServers) {
+            assertThat(node.waitForInitAsync(), willCompleteSuccessfully());
+
+            if (publicIgnite == null) {
+                publicIgnite = node.api();
+                igniteImpl = unwrapIgniteImpl(publicIgnite);
+            }
+        }
+    }
+
+    @Nullable
+    protected String clusterConfiguration() {
+        return "ignite {}";
+    }
+
+    protected static String nodeName(int port) {
+        return "node_" + port;
+    }
+
+    protected Path workDir() throws Exception {
+        return new File("c:/work/tpcc").toPath();

Review Comment:
   Yes, this part clearly must be fixed



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Predicate;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+    /** Hint for maximum concurrent txns. */
+    private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+    /** Field updater for inflights. */
+    private static final AtomicLongFieldUpdater<CleanupContext> UPDATER = 
newUpdater(CleanupContext.class, "inflights");

Review Comment:
   Name of this field should reflect the name of the field that it updates. I 
recommend to rename it to `INFLIGHTS_UPDATER`, for example



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -357,16 +398,52 @@ private LockState adjustLockState(LockState state, 
LockState v) {
         }
     }
 
-    private void track(UUID txId, Releasable val) {
+    private void seal(UUID txId) {
+        txMap.compute(txId, (k, v) -> {
+            if (v == null) {
+                return null;
+            }
+
+            v.sealed = true;
+
+            return v;
+        });
+    }
+
+    private boolean sealed(UUID txId) {
+        boolean[] ret = {false};
+        txMap.compute(txId, (k, v) -> {
+            if (v == null) {
+                return null;
+            }
+
+            if (v.sealed) {
+                ret[0] = true;
+            }
+
+            return v;
+        });

Review Comment:
   Is it possible to avoid "writing" into a concurrent map on read operations? 
I believe that a simple "get" should be enough here if we make `v.sealed` 
volatile, this code will become non-blocking then (I think). Certainly more 
efficient



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Predicate;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+    /** Hint for maximum concurrent txns. */
+    private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+    /** Field updater for inflights. */
+    private static final AtomicLongFieldUpdater<CleanupContext> UPDATER = 
newUpdater(CleanupContext.class, "inflights");
+
+    /** Txn contexts. */
+    private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new 
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
+
+    /**
+     * Registers the inflight for a transaction.
+     *
+     * @param txId The transaction id.
+     * @param testPred Test predicate.
+     * @param requestType Request type.
+     *
+     * @return Cleanup context.
+     */
+    @Nullable CleanupContext addInflight(UUID txId, Predicate<UUID> testPred, 
RequestType requestType) {
+        boolean[] res = {true};
+
+        CleanupContext ctx0 = txCtxMap.compute(txId, (uuid, ctx) -> {
+            if (ctx == null) {
+                ctx = new CleanupContext();
+            }
+
+            if (ctx.finishFut != null || testPred.test(txId)) {
+                res[0] = false;
+            } else {
+                UPDATER.incrementAndGet(ctx);
+                if (requestType.isWrite()) {
+                    ctx.hasWrites = true;
+                }
+            }
+
+            return ctx;
+        });
+
+        return res[0] ? ctx0 : null;
+    }
+
+    /**
+     * Runs a closure under a transaction lock.
+     *
+     * @param txId Transaction id.
+     * @param r Runnable.
+     */
+    public void runClosure(UUID txId, Runnable r) {
+        txCtxMap.compute(txId, (uuid, ctx) -> {
+            r.run();
+
+            return ctx;
+        });
+    }
+
+    /**
+     * Unregisters the inflight for a transaction.
+     *
+     * @param ctx Cleanup context.
+     */
+    static void removeInflight(CleanupContext ctx) {
+        long val = UPDATER.decrementAndGet(ctx);
+
+        if (ctx.finishFut != null && val == 0) {
+            ctx.finishFut.complete(null);
+        }
+    }
+
+    /**
+     * Get finish future.

Review Comment:
   It's not just `get`, it changes internal state, as far as I see



##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java:
##########
@@ -150,4 +169,1220 @@ public void 
testCompatibilityLockMapSizePropertyNameWasNotChanged() {
                 LOCK_MAP_SIZE_PROPERTY_NAME
         );
     }
+
+    @Test
+    public void testSingleKeyWrite() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, X);
+
+        assertTrue(fut0.isDone());
+
+        Collection<UUID> queue = lockManager.queue(key);
+
+        assertTrue(queue.size() == 1 && queue.iterator().next().equals(txId1));
+
+        Waiter waiter = lockManager.waiter(key, txId1);
+
+        assertTrue(waiter.locked());
+
+        lockManager.release(fut0.join());
+    }
+
+    @Test
+    public void testSingleKeyWriteLock() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId2, key, X);
+
+        assertTrue(fut0.isDone());
+
+        assertTrue(txId2.compareTo(txId1) > 0);
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, X);
+
+        assertFalse(fut1.isDone());
+
+        assertTrue(lockManager.waiter(key, txId2).locked());
+        assertFalse(lockManager.waiter(key, txId1).locked());
+
+        lockManager.release(fut0.join());
+
+        assertTrue(fut1.isDone());
+
+        assertNull(lockManager.waiter(key, txId2));
+        assertTrue(lockManager.waiter(key, txId1).locked());
+
+        lockManager.release(fut1.join());
+
+        assertNull(lockManager.waiter(key, txId2));
+        assertNull(lockManager.waiter(key, txId1));
+    }
+
+    @Test
+    public void downgradeLockOutOfTurnTest() {
+        UUID txId0 = TestTransactionIds.newTransactionId();
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        lockManager.acquire(txId0, key, S).join();
+        Lock lock = lockManager.acquire(txId2, key, S).join();
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+        assertFalse(fut0.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, X);
+        expectConflict(fut2);
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
+        fut1.join();
+
+        assertFalse(fut0.isDone());
+
+        lockManager.release(lock);
+        fut0.thenAccept(l -> lockManager.release(l));
+    }
+
+    @Test
+    public void upgradeLockImmediatelyTest() {
+        UUID txId0 = TestTransactionIds.newTransactionId();
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+
+        LockKey key = lockKey();
+
+        CompletableFuture<Lock> fut = lockManager.acquire(txId0, key, IS);
+        assertTrue(fut.isDone());
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, IS);
+        assertTrue(fut0.isDone());
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId2, key, IS);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, key, IX);
+        assertTrue(fut2.isDone());
+
+        lockManager.release(fut1.join());
+    }
+
+    @Test
+    public void testSingleKeyReadWriteLock() {
+        UUID txId0 = TestTransactionIds.newTransactionId();
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+        UUID txId3 = TestTransactionIds.newTransactionId();
+        assertTrue(txId3.compareTo(txId2) > 0);
+        assertTrue(txId2.compareTo(txId1) > 0);
+        assertTrue(txId1.compareTo(txId0) > 0);
+        LockKey key = lockKey();
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(txId3, key, S);
+        assertTrue(fut3.isDone());
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, S);
+        assertTrue(fut2.isDone());
+
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+        assertFalse(fut0.isDone());
+
+        assertTrue(lockManager.waiter(key, txId3).locked());
+        assertTrue(lockManager.waiter(key, txId2).locked());
+        assertTrue(lockManager.waiter(key, txId1).locked());
+        assertFalse(lockManager.waiter(key, txId0).locked());
+
+        lockManager.release(fut1.join());
+
+        assertTrue(lockManager.waiter(key, txId3).locked());
+        assertTrue(lockManager.waiter(key, txId2).locked());
+        assertNull(lockManager.waiter(key, txId1));
+        assertFalse(lockManager.waiter(key, txId0).locked());
+
+        lockManager.release(fut3.join());
+
+        assertNull(lockManager.waiter(key, txId3));
+        assertTrue(lockManager.waiter(key, txId2).locked());
+        assertNull(lockManager.waiter(key, txId1));
+        assertFalse(lockManager.waiter(key, txId0).locked());
+
+        lockManager.release(fut2.join());
+
+        assertNull(lockManager.waiter(key, txId3));
+        assertNull(lockManager.waiter(key, txId2));
+        assertNull(lockManager.waiter(key, txId1));
+        assertTrue(lockManager.waiter(key, txId0).locked());
+    }
+
+    @Test
+    public void testSingleKeyReadWriteConflict() {
+        UUID txId0 = TestTransactionIds.newTransactionId();
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        LockKey key = lockKey();
+
+        // Lock in order
+        CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, S);
+        assertTrue(fut0.isDone());
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId0, key, X);
+        assertFalse(fut1.isDone());
+
+        lockManager.release(fut0.join());
+        assertTrue(fut1.isDone());
+
+        lockManager.release(fut1.join());
+
+        assertTrue(lockManager.queue(key).isEmpty());
+
+        // Lock not in order
+        fut0 = lockManager.acquire(txId0, key, S);
+        assertTrue(fut0.isDone());
+
+        try {
+            lockManager.acquire(txId1, key, X).join();
+
+            fail();
+        } catch (CompletionException e) {
+            // Expected.
+        }

Review Comment:
   With a static import, of course, I mentioned class name for comment's 
clarity only



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Predicate;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+    /** Hint for maximum concurrent txns. */
+    private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+    /** Field updater for inflights. */
+    private static final AtomicLongFieldUpdater<CleanupContext> UPDATER = 
newUpdater(CleanupContext.class, "inflights");
+
+    /** Txn contexts. */
+    private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new 
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
+
+    /**
+     * Registers the inflight for a transaction.
+     *
+     * @param txId The transaction id.
+     * @param testPred Test predicate.
+     * @param requestType Request type.
+     *
+     * @return Cleanup context.
+     */
+    @Nullable CleanupContext addInflight(UUID txId, Predicate<UUID> testPred, 
RequestType requestType) {
+        boolean[] res = {true};
+
+        CleanupContext ctx0 = txCtxMap.compute(txId, (uuid, ctx) -> {
+            if (ctx == null) {
+                ctx = new CleanupContext();
+            }
+
+            if (ctx.finishFut != null || testPred.test(txId)) {
+                res[0] = false;
+            } else {
+                UPDATER.incrementAndGet(ctx);
+                if (requestType.isWrite()) {
+                    ctx.hasWrites = true;
+                }
+            }
+
+            return ctx;
+        });
+
+        return res[0] ? ctx0 : null;
+    }
+
+    /**
+     * Runs a closure under a transaction lock.
+     *
+     * @param txId Transaction id.
+     * @param r Runnable.
+     */
+    public void runClosure(UUID txId, Runnable r) {
+        txCtxMap.compute(txId, (uuid, ctx) -> {

Review Comment:
   It synchronizes the entire bucket, not just a single key. Should we make a 
load factor smaller, assuming you're planning to use this method frequently, or 
maybe consider a more fine-grained manual locking?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Predicate;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+    /** Hint for maximum concurrent txns. */
+    private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+    /** Field updater for inflights. */
+    private static final AtomicLongFieldUpdater<CleanupContext> UPDATER = 
newUpdater(CleanupContext.class, "inflights");
+
+    /** Txn contexts. */
+    private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new 
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
+
+    /**
+     * Registers the inflight for a transaction.
+     *
+     * @param txId The transaction id.
+     * @param testPred Test predicate.
+     * @param requestType Request type.
+     *
+     * @return Cleanup context.
+     */
+    @Nullable CleanupContext addInflight(UUID txId, Predicate<UUID> testPred, 
RequestType requestType) {
+        boolean[] res = {true};
+
+        CleanupContext ctx0 = txCtxMap.compute(txId, (uuid, ctx) -> {
+            if (ctx == null) {
+                ctx = new CleanupContext();
+            }
+
+            if (ctx.finishFut != null || testPred.test(txId)) {
+                res[0] = false;
+            } else {
+                UPDATER.incrementAndGet(ctx);
+                if (requestType.isWrite()) {
+                    ctx.hasWrites = true;
+                }
+            }
+
+            return ctx;
+        });
+
+        return res[0] ? ctx0 : null;
+    }
+
+    /**
+     * Runs a closure under a transaction lock.
+     *
+     * @param txId Transaction id.
+     * @param r Runnable.
+     */
+    public void runClosure(UUID txId, Runnable r) {
+        txCtxMap.compute(txId, (uuid, ctx) -> {
+            r.run();
+
+            return ctx;
+        });
+    }
+
+    /**
+     * Unregisters the inflight for a transaction.
+     *
+     * @param ctx Cleanup context.
+     */
+    static void removeInflight(CleanupContext ctx) {
+        long val = UPDATER.decrementAndGet(ctx);
+
+        if (ctx.finishFut != null && val == 0) {
+            ctx.finishFut.complete(null);
+        }
+    }
+
+    /**
+     * Get finish future.
+     *
+     * @param txId Transaction id.
+     * @return The future.
+     */
+    public @Nullable CleanupContext finishFuture(UUID txId) {
+        return txCtxMap.compute(txId, (uuid, ctx) -> {
+            if (ctx == null) {
+                return null;
+            }
+
+            if (ctx.finishFut == null) {
+                ctx.finishFut = UPDATER.get(ctx) == 0 ? nullCompletedFuture() 
: new CompletableFuture<>();
+            }
+
+            // Avoiding a data race with a concurrent decrementing thread, 
which might not see finishFut publication.
+            if (UPDATER.get(ctx) == 0 && !ctx.finishFut.isDone()) {
+                ctx.finishFut = nullCompletedFuture();
+            }

Review Comment:
   In this case we're ignoring the `new CompletableFuture<>()` that we just 
allocated, right? Is there a chance of this future leaking outside of this 
method?



##########
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -317,6 +317,14 @@ void 
testManualRebalanceIfMajorityIsLostSpecifyPartitions() throws Exception {
         int anotherPartId = 0;
 
         IgniteImpl node0 = igniteImpl(0);
+
+        if (!node0.txManager().lockManager().policy().reverse()) {

Review Comment:
   I recommend always using `assume*` statements in such situations, as already 
mentioned in my previous comments



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/NoWaitDeadlockPreventionPolicy.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.Comparator;
+import java.util.UUID;
+import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
+import org.apache.ignite.internal.tx.Waiter;
+
+/**
+ * Nowait deadlock prevention policy.
+ */
+public class NoWaitDeadlockPreventionPolicy implements 
DeadlockPreventionPolicy {
+    /** {@inheritDoc} */

Review Comment:
   These comments are not necessary anymore



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -861,35 +949,114 @@ IgniteBiTuple<CompletableFuture<Void>, LockMode> 
tryAcquire(UUID txId, LockMode
                     }
                 }
 
-                if (!isWaiterReadyToNotify(waiter, false)) {
-                    if (deadlockPreventionPolicy.waitTimeout() > 0) {
+                notifications = tryAcquireInternal(waiter, prev == null, 
false);
+            }
+
+            // Callback outside the monitor.
+            for (Runnable r : notifications) {
+                r.run();
+            }
+
+            return new IgniteBiTuple<>(waiter.fut, waiter.lockMode());
+        }
+
+        private void failWaiter(WaiterImpl waiter, List<Runnable> 
notifications, Exception exception) {
+            if (!waiter.locked()) {
+                waiters.remove(waiter.txId());
+            } else if (waiter.hasLockIntent()) {
+                waiter.refuseIntent(); // Reset lock intention.
+            }
+            waiter.fail(exception);
+            notifications.add(waiter::notifyLocked);
+        }
+
+        private List<Runnable> tryAcquireInternal(WaiterImpl waiter, boolean 
track, boolean unlock) {
+            List<Runnable> notifications = new ArrayList<>();
+
+            if (sealed(waiter.txId)) {
+                failWaiter(waiter, notifications, 
resolveTransactionSealedException(waiter.txId));
+                return notifications;
+            }
+
+            boolean[] needWait = {false};
+            boolean[] notified = {false};
+
+            findConflicts(waiter, owner -> {
+                assert !waiter.txId.equals(owner.txId);
+                @Nullable WaiterImpl toFail = (WaiterImpl) 
deadlockPreventionPolicy.allowWait(waiter, owner);

Review Comment:
   Is the cast to `WaiterImpl` necessary? I don't think so, could you please 
explain this decision?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionIds.java:
##########
@@ -86,4 +86,12 @@ private static long combine(int nodeId, TxPriority priority) 
{
         // Shift the int 32 bits and combine with the boolean
         return ((long) nodeId << 32) | priorityAsInt;
     }
+
+    public static int hash(UUID txId, int divisor) {
+        return Math.floorMod(spread(txId.hashCode()), divisor);

Review Comment:
   `spread` is already positive, why don't we use `%` here?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReversedWaitDieDeadlockPreventionPolicy.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.Comparator;
+import java.util.UUID;
+import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
+import org.apache.ignite.internal.tx.Waiter;
+
+/**
+ * Reversed wait die implementation. Same as wait die, but reverses the wait 
order: younger is allowed to wait for older, older is rejected
+ * if conflicts with younger.
+ */
+public class ReversedWaitDieDeadlockPreventionPolicy implements 
DeadlockPreventionPolicy {
+    private static final TxIdPriorityComparator TX_ID_PRIORITY_COMPARATOR = 
new TxIdPriorityComparator();
+
+    /** {@inheritDoc} */
+    @Override
+    public final Comparator<UUID> txIdComparator() {
+        return TX_ID_PRIORITY_COMPARATOR;
+    }
+
+    @Override
+    public Waiter allowWait(Waiter waiter, Waiter owner) {
+        int res = txIdComparator().compare(waiter.txId(), owner.txId());

Review Comment:
   I'm not totally convinced that my comment makes sense, but I would rather 
see the
   ```
   int res = TxIdPriorityComparator.compare(waiter.txId(), owner.txId());
   ...
   ```
   with a static call. Comparison is a potentially hot operation, and here we 
have too many virtual calls when all we need is, for the most part, to compare 
a few numeric values.
   
   `TxIdPriorityComparator` itself doesn't look optimized either, we should do 
something about it too. In a separate Jira, I presume.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to