Copilot commented on code in PR #7799:
URL: https://github.com/apache/ignite-3/pull/7799#discussion_r2944806848


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java:
##########
@@ -308,4 +315,16 @@ private static Map<ZonePartitionIdMessage, 
PartitionEnlistmentMessage> toEnliste
 
         return messages;
     }
+
+    /**
+     * Sends a message to kill a transaction to it's coordinator.
+     *

Review Comment:
   The `Docstring` says "Sends a message to kill a transaction to it's 
coordinator." — "it's" should be "its" (possessive, not contraction).



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+    private static final int BASE_PORT = 3344;
+    private static final int BASE_CLIENT_PORT = 10800;
+    private static final int BASE_REST_PORT = 10300;
+
+    private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+    protected static Ignite publicIgnite;
+    protected static IgniteImpl igniteImpl;
+
+    public static void main(String[] args) throws Exception {
+        TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
+        runner.startCluster();
+    }
+
+    public IgniteImpl node(int idx) {
+        return unwrapIgniteImpl(igniteServers.get(idx).api());
+    }
+
+    private void startCluster() throws Exception {
+        Path workDir = workDir();
+
+        String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+        @Language("HOCON")
+        String configTemplate = "ignite {\n"
+                + "  \"network\": {\n"
+                + "    \"port\":{},\n"
+                + "    \"nodeFinder\":{\n"
+                + "      \"netClusterNodes\": [ {} ]\n"
+                + "    }\n"
+                + "  },\n"
+                + "  storage.profiles: {"
+                + "        " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+                + "        " + DEFAULT_STORAGE_PROFILE + ".sizeBytes: " + 
pageMemorySize() + " "
+                + "  },\n"
+                + "  clientConnector: { port:{} },\n"
+                + "  clientConnector.sendServerExceptionStackTraceToClient: 
true\n"
+                + "  rest.port: {},\n"
+                + "  raft.fsync = " + fsync() + ",\n"
+                + "  system.partitionsLogPath = \"" + logPath() + "\",\n"
+                + "  failureHandler.handler: {\n"
+                + "      type: \"" + 
StopNodeOrHaltFailureHandlerConfigurationSchema.TYPE + "\",\n"
+                + "      tryStop: true,\n"
+                + "      timeoutMillis: 60000,\n" // 1 minute for graceful 
shutdown
+                + "  },\n"
+                + "}";
+
+        for (int i = 0; i < nodes(); i++) {
+            int port = BASE_PORT + i;
+            String nodeName = nodeName(port);
+
+            String config = IgniteStringFormatter.format(configTemplate, port, 
connectNodeAddr,
+                    BASE_CLIENT_PORT + i, BASE_REST_PORT + i);
+
+            
igniteServers.add(TestIgnitionManager.startWithProductionDefaults(nodeName, 
config, workDir.resolve(nodeName)));
+        }
+
+        String metaStorageNodeName = nodeName(BASE_PORT);
+
+        InitParameters initParameters = InitParameters.builder()
+                .metaStorageNodeNames(metaStorageNodeName)
+                .clusterName("cluster")
+                .clusterConfiguration(clusterConfiguration())
+                .build();
+
+        TestIgnitionManager.init(igniteServers.get(0), initParameters);
+
+        for (IgniteServer node : igniteServers) {
+            assertThat(node.waitForInitAsync(), willCompleteSuccessfully());
+
+            if (publicIgnite == null) {
+                publicIgnite = node.api();
+                igniteImpl = unwrapIgniteImpl(publicIgnite);
+            }
+        }
+    }
+
+    @Nullable
+    protected String clusterConfiguration() {
+        return "ignite {}";
+    }
+
+    protected static String nodeName(int port) {
+        return "node_" + port;
+    }
+
+    protected Path workDir() throws Exception {
+        return new File("c:/work/tpcc").toPath();

Review Comment:
   The `TpccBenchmarkNodeRunner` contains a hardcoded Windows path 
(`c:/work/tpcc`). This class appears to be a developer-local benchmark runner 
and shouldn't be committed to the repository.
   



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WoundWaitDeadlockPreventionPolicy.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.Comparator;
+import java.util.UUID;
+import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
+import org.apache.ignite.internal.tx.Waiter;
+
+/**
+ * Wound-wait prevention policy. TODO desc.
+ */

Review Comment:
   Javadoc says "TODO desc." — the class documentation is incomplete.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -1203,6 +1238,46 @@ public CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGr
         });
     }
 
+    @Override
+    public <T> T runInTransaction(Function<Transaction, T> clo, 
HybridTimestampTracker observableTimestampTracker,
+            @Nullable TransactionOptions options) {
+        boolean readOnly = options != null && options.readOnly();
+
+        InternalTxOptions internalTxOptions = options == null
+                ? InternalTxOptions.defaults()
+                : InternalTxOptions.builder()
+                        .timeoutMillis(options.timeoutMillis())
+                        .txLabel(options.label())
+                        .build();
+
+        long startTimestamp = IgniteUtils.monotonicMs();
+        long timeout = getTimeoutOrDefault(internalTxOptions, 
txConfig.readWriteTimeoutMillis().value());
+        long initialTimeout = startTimestamp + timeout;
+
+        return runInTransactionInternal(old -> {
+            InternalTxOptions opts;
+            if (old != null) {
+//                InternalTransaction oldInt = (InternalTransaction) old;
+//                UUID id = oldInt.id();
+//
+//                int cnt = TransactionIds.retryCnt(id);
+//                int nodeId = TransactionIds.nodeId(id);
+//                TxPriority priority = TransactionIds.priority(id);
+//                UUID retryId = 
TransactionIds.transactionId(id.getMostSignificantBits(), cnt + 1, nodeId, 
priority);
+
+                opts = 
InternalTxOptions.builder().priority(internalTxOptions.priority())
+                        //.retryId(retryId)
+                        .timeoutMillis(timeout) // TODO
+                        .txLabel(internalTxOptions.txLabel()).build();
+
+                //LOG.info("Restarting the transaction [oldId=" + id + ", 
newId=" + retryId + ", remaining=" + opts.timeoutMillis());
+            } else {
+                opts = internalTxOptions;
+            }
+            return beginExplicit(observableTimestampTracker, readOnly, opts);
+        }, clo, startTimestamp, initialTimeout);

Review Comment:
   The `runInTransaction` method contains significant amounts of commented-out 
code (lines 1260-1266, 1269, 1273). This looks like incomplete/experimental 
work that shouldn't be merged as-is.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+    /** Hint for maximum concurrent txns. */
+    private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+    /** Txn contexts. */
+    private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new 
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
+
+    /**
+     * Registers the inflight update for a transaction.
+     *
+     * @param txId The transaction id.
+     */
+    public @Nullable CleanupContext addInflight(UUID txId, Predicate<UUID> 
testPred, RequestType requestType) {
+        boolean[] res = {true};
+
+        CleanupContext ctx0 = txCtxMap.compute(txId, (uuid, ctx) -> {
+            if (ctx == null) {
+                ctx = new CleanupContext();
+            }
+
+            //ctx.opFuts.add(new IgniteBiTuple<>(new Exception(), fut));
+
+            if (ctx.finishFut != null || testPred.test(txId)) {
+                res[0] = false;
+            } else {
+                //ctx.addInflight();
+                ctx.inflights.incrementAndGet();
+                ctx.hasWrites = true;
+            }
+
+            return ctx;
+        });
+
+        return res[0] ? ctx0 : null;
+    }
+
+    /**
+     * Unregisters the inflight for a transaction.
+     *
+     * @param ctx Cleanup context.
+     */
+    public static void removeInflight(CleanupContext ctx) {
+        long val = ctx.inflights.decrementAndGet();
+
+        if (ctx.finishFut != null && val == 0) {
+            ctx.finishFut.complete(null);
+        }
+    }
+
+    /**
+     * Get finish future.
+     *
+     * @param txId Transaction id.
+     * @return The future.
+     */
+    public @Nullable CleanupContext finishFuture(UUID txId) {
+        return txCtxMap.compute(txId, (uuid, ctx) -> {
+            if (ctx == null) {
+                return null;
+            }
+
+            // LOG.info("DBG: finishFuture " + txId + " " + ctx.inflights);
+
+            if (ctx.finishFut == null) {
+                ctx.finishFut = ctx.inflights.get() == 0 ? 
nullCompletedFuture() : new CompletableFuture<>();
+            }
+
+            // Avoiding a data race with a concurrent decrementing thread, 
which might not see finishFut publication.
+            if (ctx.inflights.get() == 0 && !ctx.finishFut.isDone()) {
+                ctx.finishFut = nullCompletedFuture();
+            }
+
+            return ctx;
+        });
+    }
+
+    /**
+     * Cleanup inflights context for this transaction.
+     *
+     * @param uuid Tx id.
+     */
+    public void erase(UUID uuid) {
+        txCtxMap.remove(uuid);
+    }
+
+    /**
+     * Check if the inflights map contains a given transaction.
+     *
+     * @param txId Tx id.
+     * @return {@code True} if contains.
+     */
+    public boolean contains(UUID txId) {
+        return txCtxMap.containsKey(txId);
+    }
+
+    /**
+     * Shared Cleanup context.
+     */
+    public static class CleanupContext {
+        volatile CompletableFuture<Void> finishFut;
+        AtomicLong inflights = new AtomicLong(0); // TODO atomic updater
+        volatile boolean hasWrites = false;
+
+//        void addInflight() {
+//            inflights.incrementAndGet();
+//        }
+//
+//        void removeInflight(UUID txId) {
+//            //assert inflights > 0 : format("No inflights, cannot remove any 
[txId={}, ctx={}]", txId, this);
+//
+//            inflights.decrementAndGet();
+//        }
+    }
+
+    @TestOnly
+    public ConcurrentHashMap<UUID, CleanupContext> map() {
+        return txCtxMap;
+    }
+}

Review Comment:
   `PartitionInflights` has commented-out code (lines 54, 59, 137-145) and TODO 
comments (line 134). The class also exposes mutable internal state via `map()` 
annotated `@TestOnly`, and the `CleanupContext` inner class has public mutable 
fields (`finishFut`, `inflights`, `hasWrites`) instead of proper encapsulation. 
This needs cleanup before merge.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java:
##########
@@ -80,4 +83,18 @@ public InternalTransaction beginImplicit(boolean readOnly) {
     public Transaction beginWithPriority(boolean readOnly, TxPriority 
priority) {
         return txManager.beginExplicit(observableTimestampTracker, readOnly, 
InternalTxOptions.defaultsWithPriority(priority));
     }
+
+    @Override
+    public <T> T runInTransaction(Function<Transaction, T> clo, @Nullable 
TransactionOptions options) throws TransactionException {
+        return txManager.runInTransaction(clo, observableTimestampTracker, 
options);
+    }
+
+    @Override
+    public <T> CompletableFuture<T> runInTransactionAsync(
+            Function<Transaction, CompletableFuture<T>> clo,
+            @Nullable TransactionOptions options
+    ) {
+        //return txManager.runInTransaction(clo, observableTimestampTracker, 
tx);
+        return CompletableFuture.failedFuture(new Exception());

Review Comment:
   `runInTransactionAsync` returns a `failedFuture(new Exception())` — this is 
a stub that will break any caller of this API. Either implement it properly or 
throw `UnsupportedOperationException` with a clear message.
   



##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java:
##########
@@ -108,4 +110,15 @@ private static ClientTransaction readTx(
 
         return new ClientTransaction(r.clientChannel(), ch, id, isReadOnly, 
EMPTY, null, EMPTY, null, timeout);
     }
+
+    @Override
+    public <T> T runInTransaction(Function<Transaction, T> clo, @Nullable 
TransactionOptions options) throws TransactionException {
+        throw new IllegalArgumentException();
+    }
+
+    @Override
+    public <T> CompletableFuture<T> 
runInTransactionAsync(Function<Transaction, CompletableFuture<T>> clo,
+            @Nullable TransactionOptions options) {
+        throw new IllegalArgumentException();

Review Comment:
   The `ClientTransactions` implementation throws `IllegalArgumentException` 
instead of providing a proper implementation or 
`UnsupportedOperationException`. This will produce confusing errors for thin 
client users.
   



##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/LockWaiterMatcher.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.test;
+
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.tx.Lock;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+public class LockWaiterMatcher extends 
TypeSafeMatcher<CompletableFuture<Lock>> {
+    private final UUID waiterId;
+    private CompletableFuture<Lock> item;
+
+    private LockWaiterMatcher(UUID txId) {
+        this.waiterId = txId;
+    }
+
+    @Override
+    protected boolean matchesSafely(CompletableFuture<Lock> item) {
+        try {
+            this.item = item;
+            item.get(50, TimeUnit.MILLISECONDS);
+            return false; // Timeout exception is expected.
+        } catch (TimeoutException e) {
+            return true;
+        } catch (InterruptedException | ExecutionException | 
CancellationException e) {
+            throw new AssertionError(e);
+        }
+    }
+
+    @Override
+    protected void describeMismatchSafely(CompletableFuture<Lock> item, 
Description mismatchDescription) {
+        mismatchDescription.appendText("lock future is completed 
").appendValue(item);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        description.appendText("lock future which should wait for 
").appendValue(waiterId);
+    }
+
+    public static LockWaiterMatcher waitsFor(UUID... txIds) {
+        return new LockWaiterMatcher(txIds[0]);
+    }

Review Comment:
   The `LockWaiterMatcher` has an unused field `item` (line 32) that is 
assigned in `matchesSafely` but never read. Additionally, the `waitsFor(UUID... 
txIds)` factory method accepts varargs but only uses `txIds[0]`, silently 
ignoring additional arguments. This is misleading API design.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -1031,15 +1035,44 @@ public void processDelayedAck(Object ignored, @Nullable 
Throwable err) {
 
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
-        var deadlockPreventionPolicy = new 
DeadlockPreventionPolicyImpl(DEFAULT_TX_ID_COMPARATOR, DEFAULT_LOCK_TIMEOUT);
+        var deadlockPreventionPolicy = new WoundWaitDeadlockPreventionPolicy() 
{
+            @Override
+            public long waitTimeout() {
+                return DEFAULT_LOCK_TIMEOUT;
+            }
+
+            @Override
+            public void failAction(UUID owner) {
+                // TODO resolve tx with ABORT and delete locks
+                TxStateMeta state = txStateVolatileStorage.state(owner);
+                if (state == null || state.txCoordinatorId() == null) {
+                    return; // tx state is invalid. locks should be cleaned up 
by tx recovery process.
+                }
+
+                InternalClusterNode coordinator = 
topologyService.getById(state.txCoordinatorId());
+                if (coordinator == null) {
+                    return; // tx is abandoned. locks should be cleaned up by 
tx recovery process.
+                }
+
+                txMessageSender.kill(coordinator, owner);
+            }
+        };
+
+//        var deadlockPreventionPolicy = new WaitDieDeadlockPreventionPolicy() 
{
+//            @Override
+//            public long waitTimeout() {
+//                return DEFAULT_LOCK_TIMEOUT;
+//            }
+//        };

Review Comment:
   There is a large block of commented-out code (lines 1061-1066) and multiple 
TODO/debug comments throughout this method. This should be cleaned up before 
merging.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TraceableFuture.java:
##########
@@ -0,0 +1,20 @@
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+
+public class TraceableFuture<T> extends CompletableFuture<T> {
+    private StringWriter log = new StringWriter();
+
+    public synchronized void log(String msg) {
+        log.append("<" + msg + ">");
+    }
+
+    public String message() {
+        String str;
+        synchronized (this) {
+            str = log.toString();
+        }
+        return str;
+    }
+}

Review Comment:
   `TraceableFuture` is missing a license header. Also, it appears to be a 
debug utility that is not used in production code — it should be removed or 
moved to test fixtures.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java:
##########
@@ -132,6 +137,8 @@ public CompletableFuture<NetworkMessage> cleanup(
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp
     ) {
+        //LOG.info("DBG: send cleanup " + txId);

Review Comment:
   Commented-out debug logging (`//LOG.info("DBG: send cleanup"...`) should be 
removed before merging.



##########
modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java:
##########
@@ -342,14 +331,8 @@ default <T> CompletableFuture<T> 
runInTransactionAsync(Function<Transaction, Com
      * @param <T> Closure result type.
      * @return The result.
      */
-    default <T> CompletableFuture<T> runInTransactionAsync(
+    <T> CompletableFuture<T> runInTransactionAsync(
             Function<Transaction, CompletableFuture<T>> clo,
             @Nullable TransactionOptions options
-    ) {
-        // This start timestamp is not related to transaction's begin 
timestamp and only serves as local time for counting the timeout of
-        // possible retries.
-        long startTimestamp = System.currentTimeMillis();
-        long initialTimeout = options == null ? 
TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS) : 
options.timeoutMillis();
-        return runInTransactionAsyncInternal(this, clo, options, 
startTimestamp, initialTimeout, null);
-    }
+    );

Review Comment:
   The `runInTransaction` default implementations were removed from 
`IgniteTransactions` interface and made abstract. This is a breaking API change 
— any existing third-party implementation of `IgniteTransactions` will fail to 
compile. The previous design with default methods was more backwards-compatible.



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java:
##########
@@ -87,7 +87,7 @@ public void committedGroupConfiguration(byte[] config) {
 
     @Override
     public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
-        assertThreadAllowsToRead();
+        //assertThreadAllowsToRead();

Review Comment:
   This line disables a thread safety assertion (`assertThreadAllowsToRead`). 
This looks like a debug workaround that should not be merged — it weakens the 
thread safety guarantees of the storage layer.
   



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -104,14 +111,18 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
     private Executor delayedExecutor;
 
     /** Enlisted transactions. */
-    private final ConcurrentHashMap<UUID, ConcurrentLinkedQueue<Releasable>> 
txMap = new ConcurrentHashMap<>(1024);
+    private final ConcurrentHashMap<UUID, SealableQueue> txMap = new 
ConcurrentHashMap<>(1024);
 
     /** Coarse locks. */
     private final ConcurrentHashMap<Object, CoarseLockState> coarseMap = new 
ConcurrentHashMap<>();
 
     /** Tx state required to present tx labels in logs and exceptions. */
     private final VolatileTxStateMetaStorage txStateVolatileStorage;
 
+    private static class SealableQueue extends 
ConcurrentLinkedQueue<Releasable> {
+        boolean sealed;

Review Comment:
   The `sealed` field on `SealableQueue` is not volatile or otherwise 
synchronized, but it's accessed under `txMap.compute` and also read 
independently via the `sealed()` method. This can lead to visibility issues in 
`tryAcquireInternal` where `sealed(waiter.txId)` is called outside of 
`txMap.compute`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to