ibessonov commented on code in PR #7799:
URL: https://github.com/apache/ignite-3/pull/7799#discussion_r3039906279
##########
modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java:
##########
@@ -1614,7 +1630,7 @@ public int hashCode() {
private static Stream<Arguments> killTestContextFactory() {
return Stream.of(
argumentSet("kv", new
KillTestContext(TransactionException.class,
ItThinClientTransactionsTest::putKv)),
- argumentSet("sql", new KillTestContext(SqlException.class,
ItThinClientTransactionsTest::putSql))
+ argumentSet("sql", new KillTestContext(IgniteException.class,
ItThinClientTransactionsTest::putSql))
Review Comment:
Can we somehow check a more specific type here? `IgniteException` is too
broad for an integration test
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDataConsistencyTest.java:
##########
@@ -112,18 +122,31 @@ public void testDataConsistency() throws
InterruptedException {
readThreads[i].start();
}
+ log.info("Started {} writers", WRITE_PARALLELISM);
+ log.info("Started {} readers", READ_PARALLELISM);
+
long cur = System.currentTimeMillis();
+ long curOps = ops.sum();
+
while (cur + DURATION_MILLIS > System.currentTimeMillis()) {
Thread.sleep(1000);
- log.info("Waiting...");
+ long tmp = ops.sum();
+ if (tmp == curOps) {
+ throw new AssertionError("Test doesn't make progress");
+ }
Review Comment:
Should be `assertNotEquals` instead
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDataConsistencyTest.java:
##########
@@ -50,19 +58,20 @@ public class ItDataConsistencyTest extends
ClusterPerClassIntegrationTest {
private static final String ZONE_NAME = "test_zone";
private static final String TABLE_NAME = "accounts";
private static final int WRITE_PARALLELISM =
Runtime.getRuntime().availableProcessors();
- private static final int READ_PARALLELISM = 1;
+ private static final int READ_PARALLELISM = 0;
private static final int ACCOUNTS_COUNT = WRITE_PARALLELISM * 10;
private static final double INITIAL = 1000;
private static final double TOTAL = ACCOUNTS_COUNT * INITIAL;
- private static final int DURATION_MILLIS = 10000;
+ private static final int DURATION_MILLIS = 30000;
private CyclicBarrier startBar = new CyclicBarrier(WRITE_PARALLELISM +
READ_PARALLELISM, () -> log.info("Before test"));
private LongAdder ops = new LongAdder();
private LongAdder fails = new LongAdder();
+ private LongAdder restarts = new LongAdder();
private LongAdder readOps = new LongAdder();
private LongAdder readFails = new LongAdder();
private AtomicBoolean stop = new AtomicBoolean();
- private Random rng = new Random();
+ private Random rng = new Random(0);
Review Comment:
I feel like having a random seed is a better thing. If we need a
reproducibility then we should choose the seed in `setUp` and print it into a
log, instead of having it as 0 all the time.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
Review Comment:
Can we move this class, and maybe some other related changes, into a new PR?
I feel like it just complicates an already large enough bulk of changes
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -1031,13 +1035,35 @@ public void processDelayedAck(Object ignored, @Nullable
Throwable err) {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
- var deadlockPreventionPolicy = new
DeadlockPreventionPolicyImpl(DEFAULT_TX_ID_COMPARATOR, DEFAULT_LOCK_TIMEOUT);
+ var deadlockPreventionPolicy = new WoundWaitDeadlockPreventionPolicy()
{
Review Comment:
I believe that this class should not be anonymous.
Are there plans to make `DEFAULT_LOCK_TIMEOUT` configurable somehow? It is
`0` now (as far as I see), and we can't even test any alternative
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Predicate;
+import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+ /** Hint for maximum concurrent txns. */
+ private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+ /** Field updater for inflights. */
+ private static final AtomicLongFieldUpdater<CleanupContext> UPDATER =
newUpdater(CleanupContext.class, "inflights");
+
+ /** Txn contexts. */
+ private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
+
+ /**
+ * Registers the inflight for a transaction.
+ *
+ * @param txId The transaction id.
+ * @param testPred Test predicate.
Review Comment:
What exactly is it testing? I can't understand that from the javadoc, and
the implementation is not too clear either.
A documentation for the entire class would be welcomed. For example, it's
not clear to me what the nullability of `finishFut` means and why
`removeInflight` has no data race. There are some implicit invariants that are
hard to see from the code
##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java:
##########
@@ -150,4 +169,1220 @@ public void
testCompatibilityLockMapSizePropertyNameWasNotChanged() {
LOCK_MAP_SIZE_PROPERTY_NAME
);
}
+
+ @Test
+ public void testSingleKeyWrite() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, X);
+
+ assertTrue(fut0.isDone());
+
+ Collection<UUID> queue = lockManager.queue(key);
+
+ assertTrue(queue.size() == 1 && queue.iterator().next().equals(txId1));
+
+ Waiter waiter = lockManager.waiter(key, txId1);
+
+ assertTrue(waiter.locked());
+
+ lockManager.release(fut0.join());
+ }
+
+ @Test
+ public void testSingleKeyWriteLock() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId2, key, X);
+
+ assertTrue(fut0.isDone());
+
+ assertTrue(txId2.compareTo(txId1) > 0);
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, X);
+
+ assertFalse(fut1.isDone());
+
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertFalse(lockManager.waiter(key, txId1).locked());
+
+ lockManager.release(fut0.join());
+
+ assertTrue(fut1.isDone());
+
+ assertNull(lockManager.waiter(key, txId2));
+ assertTrue(lockManager.waiter(key, txId1).locked());
+
+ lockManager.release(fut1.join());
+
+ assertNull(lockManager.waiter(key, txId2));
+ assertNull(lockManager.waiter(key, txId1));
+ }
+
+ @Test
+ public void downgradeLockOutOfTurnTest() {
+ UUID txId0 = TestTransactionIds.newTransactionId();
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ lockManager.acquire(txId0, key, S).join();
+ Lock lock = lockManager.acquire(txId2, key, S).join();
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+ assertFalse(fut0.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, X);
+ expectConflict(fut2);
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
+ fut1.join();
+
+ assertFalse(fut0.isDone());
+
+ lockManager.release(lock);
+ fut0.thenAccept(l -> lockManager.release(l));
+ }
+
+ @Test
+ public void upgradeLockImmediatelyTest() {
+ UUID txId0 = TestTransactionIds.newTransactionId();
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ CompletableFuture<Lock> fut = lockManager.acquire(txId0, key, IS);
+ assertTrue(fut.isDone());
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, IS);
+ assertTrue(fut0.isDone());
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId2, key, IS);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, key, IX);
+ assertTrue(fut2.isDone());
+
+ lockManager.release(fut1.join());
+ }
+
+ @Test
+ public void testSingleKeyReadWriteLock() {
+ UUID txId0 = TestTransactionIds.newTransactionId();
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+ UUID txId3 = TestTransactionIds.newTransactionId();
+ assertTrue(txId3.compareTo(txId2) > 0);
+ assertTrue(txId2.compareTo(txId1) > 0);
+ assertTrue(txId1.compareTo(txId0) > 0);
+ LockKey key = lockKey();
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(txId3, key, S);
+ assertTrue(fut3.isDone());
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, S);
+ assertTrue(fut2.isDone());
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+ assertFalse(fut0.isDone());
+
+ assertTrue(lockManager.waiter(key, txId3).locked());
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertTrue(lockManager.waiter(key, txId1).locked());
+ assertFalse(lockManager.waiter(key, txId0).locked());
+
+ lockManager.release(fut1.join());
+
+ assertTrue(lockManager.waiter(key, txId3).locked());
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertNull(lockManager.waiter(key, txId1));
+ assertFalse(lockManager.waiter(key, txId0).locked());
+
+ lockManager.release(fut3.join());
+
+ assertNull(lockManager.waiter(key, txId3));
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertNull(lockManager.waiter(key, txId1));
+ assertFalse(lockManager.waiter(key, txId0).locked());
+
+ lockManager.release(fut2.join());
+
+ assertNull(lockManager.waiter(key, txId3));
+ assertNull(lockManager.waiter(key, txId2));
+ assertNull(lockManager.waiter(key, txId1));
+ assertTrue(lockManager.waiter(key, txId0).locked());
+ }
+
+ @Test
+ public void testSingleKeyReadWriteConflict() {
+ UUID txId0 = TestTransactionIds.newTransactionId();
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ LockKey key = lockKey();
+
+ // Lock in order
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, S);
+ assertTrue(fut0.isDone());
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId0, key, X);
+ assertFalse(fut1.isDone());
+
+ lockManager.release(fut0.join());
+ assertTrue(fut1.isDone());
+
+ lockManager.release(fut1.join());
+
+ assertTrue(lockManager.queue(key).isEmpty());
+
+ // Lock not in order
+ fut0 = lockManager.acquire(txId0, key, S);
+ assertTrue(fut0.isDone());
+
+ try {
+ lockManager.acquire(txId1, key, X).join();
+
+ fail();
+ } catch (CompletionException e) {
+ // Expected.
+ }
Review Comment:
```suggestion
assertThat(lockManager.acquire(txId1, key, X),
CompletableFutureExceptionMatcher.willThrow(CompletionException.class));
```
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -144,6 +155,28 @@ private static int intProperty(SystemLocalConfiguration
systemProperties, String
return property == null ? defaultValue :
Integer.parseInt(property.propertyValue());
}
+ private Exception resolveTransactionSealedException(UUID txId) {
+ TxStateMeta meta = txStateVolatileStorage.state(txId);
+ Throwable cause = meta == null ? null : meta.lastException();
+ boolean isFinishedDueToTimeout = meta != null &&
meta.isFinishedDueToTimeoutOrFalse();
+ boolean isFinishedDueToError = meta != null && !isFinishedDueToTimeout
&& meta.lastExceptionErrorCode() != null;
+ Throwable publicCause = isFinishedDueToError ? cause : null;
+ Integer causeErrorCode = meta == null ? null :
meta.lastExceptionErrorCode();
+
+ return new TransactionException(
+ finishedTransactionErrorCode(isFinishedDueToTimeout,
isFinishedDueToError),
+ format("{} [{}, txState={}].",
+ finishedTransactionErrorMessage(
Review Comment:
As a separate change, could we please have a
`finishedTransactionErrorMessage` that accepts `TxStateMeta`, for example? I
see the code of calculating the arguments being duplicated over and over again,
which is a clear antipattern. A TODO with a Jira link would be enough, please
don't fix it in this PR
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2981,6 +2962,7 @@ private CompletableFuture<ReplicaResult>
processSingleEntryAction(ReadWriteSingl
});
}
case RW_UPSERT: {
+ // TODO IGNITE-28450
Review Comment:
Please add a short message that explains the issue
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -1072,45 +1187,44 @@ private List<WaiterImpl> release(UUID txId) {
*
* @return List of waiters to notify.
*/
- private List<WaiterImpl> unlockCompatibleWaiters() {
- if (!deadlockPreventionPolicy.usePriority() &&
deadlockPreventionPolicy.waitTimeout() == 0) {
+ private List<Runnable> unlockCompatibleWaiters() {
+ if (waiters.isEmpty()) {
return emptyList();
}
- ArrayList<WaiterImpl> toNotify = new ArrayList<>();
- Set<UUID> toFail = new HashSet<>();
+ ArrayList<Runnable> toNotify = new ArrayList<>();
Review Comment:
```suggestion
List<Runnable> toNotify = new ArrayList<>();
```
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -996,15 +1111,15 @@ private boolean isWaiterReadyToNotify(WaiterImpl waiter,
boolean skipFail) {
*/
@Override
public boolean tryRelease(UUID txId) {
- Collection<WaiterImpl> toNotify;
+ Collection<Runnable> toNotify;
Review Comment:
Just a note - a dedicated interface instead of `Runnable` would make code
navigation simpler. It would be trivial to find all its implementations, while
current code doesn't allow to do that
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+ private static final int BASE_PORT = 3344;
+ private static final int BASE_CLIENT_PORT = 10942;
+ private static final int BASE_REST_PORT = 10300;
+
+ private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+ protected static Ignite publicIgnite;
+ protected static IgniteImpl igniteImpl;
+
+ public static void main(String[] args) throws Exception {
+ TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
+ runner.startCluster();
+ }
+
+ public IgniteImpl node(int idx) {
+ return unwrapIgniteImpl(igniteServers.get(idx).api());
+ }
+
+ private void startCluster() throws Exception {
+ Path workDir = workDir();
+
+ String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+ @Language("HOCON")
+ String configTemplate = "ignite {\n"
+ + " \"network\": {\n"
+ + " \"port\":{},\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ {} ]\n"
Review Comment:
```suggestion
+ " network: {\n"
+ " port:{},\n"
+ " nodeFinder:{\n"
+ " netClusterNodes: [ {} ]\n"
```
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+ private static final int BASE_PORT = 3344;
+ private static final int BASE_CLIENT_PORT = 10942;
+ private static final int BASE_REST_PORT = 10300;
+
+ private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+ protected static Ignite publicIgnite;
+ protected static IgniteImpl igniteImpl;
+
+ public static void main(String[] args) throws Exception {
+ TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
Review Comment:
Is it possible to reuse the code from `org.apache.ignite.internal.Cluster`
or other test classes? I feel like there's a lot of copy-paste going on here
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+ private static final int BASE_PORT = 3344;
+ private static final int BASE_CLIENT_PORT = 10942;
+ private static final int BASE_REST_PORT = 10300;
+
+ private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+ protected static Ignite publicIgnite;
+ protected static IgniteImpl igniteImpl;
+
+ public static void main(String[] args) throws Exception {
+ TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
+ runner.startCluster();
+ }
+
+ public IgniteImpl node(int idx) {
+ return unwrapIgniteImpl(igniteServers.get(idx).api());
+ }
+
+ private void startCluster() throws Exception {
+ Path workDir = workDir();
+
+ String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+ @Language("HOCON")
+ String configTemplate = "ignite {\n"
+ + " \"network\": {\n"
+ + " \"port\":{},\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " storage.profiles: {"
+ + " " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+ + " " + DEFAULT_STORAGE_PROFILE + ".sizeBytes: " +
pageMemorySize() + " "
+ + " },\n"
+ + " clientConnector: { port:{} },\n"
+ + " clientConnector.sendServerExceptionStackTraceToClient:
true\n"
+ + " rest.port: {},\n"
+ + " raft.fsync = " + fsync() + ",\n"
+ + " system.partitionsLogPath = \"" + logPath() + "\",\n"
+ + " failureHandler.handler: {\n"
+ + " type: \"" +
StopNodeOrHaltFailureHandlerConfigurationSchema.TYPE + "\",\n"
Review Comment:
How "safe" is it to use "halt" failure handler in `integrationTest`? Maybe
"stop node" is enough?
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDataConsistencyTest.java:
##########
@@ -174,6 +199,16 @@ private void validate() {
}
assertEquals(TOTAL, total0, "Total amount invariant is not preserved");
+
+ for (int i = 0; i < initialNodes(); i++) {
+ IgniteImpl ignite = unwrapIgniteImpl(node(i));
+ try {
+ await("node " + i + " should release all locks").atMost(3,
TimeUnit.SECONDS)
+ .until(() ->
ignite.txManager().lockManager().isEmpty());
+ } catch (ConditionTimeoutException e) {
+ throw e;
+ }
Review Comment:
This is clearly an artifact of some active development. Please remove this
`try` block
##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java:
##########
@@ -203,6 +203,10 @@ public void testImplicitTransactionRetry() {
@Test
public void testImplicitTransactionTimeout() {
+ if (!txManager(accounts).lockManager().policy().reverse()) {
+ return; // This test scenario is only applicable to WaitDie.
+ }
Review Comment:
```suggestion
assumeFalse(txManager(accounts).lockManager().policy().reverse(),
"This test scenario is only applicable to WaitDie.");
```
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java:
##########
@@ -105,6 +105,11 @@ protected void
customizeInitParameters(InitParametersBuilder builder) {
@Test
public void testLockExceptionRetry() {
+ IgniteImpl ignite = node0();
+ if (!ignite.txManager().lockManager().policy().reverse()) {
+ return; // This test scenario is only applicable to reversed
policy.
+ }
Review Comment:
```suggestion
assumeFalse(ignite.txManager().lockManager().policy().reverse(), "
This test scenario is only applicable to reversed policy.");
```
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDataConsistencyTest.java:
##########
@@ -112,18 +122,31 @@ public void testDataConsistency() throws
InterruptedException {
readThreads[i].start();
}
+ log.info("Started {} writers", WRITE_PARALLELISM);
+ log.info("Started {} readers", READ_PARALLELISM);
+
long cur = System.currentTimeMillis();
+ long curOps = ops.sum();
+
while (cur + DURATION_MILLIS > System.currentTimeMillis()) {
Thread.sleep(1000);
Review Comment:
I know it's not your code, but `1000` is too much for a pause, can we make
it smaller please?
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+ private static final int BASE_PORT = 3344;
+ private static final int BASE_CLIENT_PORT = 10942;
+ private static final int BASE_REST_PORT = 10300;
+
+ private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+ protected static Ignite publicIgnite;
+ protected static IgniteImpl igniteImpl;
+
+ public static void main(String[] args) throws Exception {
+ TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
+ runner.startCluster();
+ }
+
+ public IgniteImpl node(int idx) {
+ return unwrapIgniteImpl(igniteServers.get(idx).api());
+ }
+
+ private void startCluster() throws Exception {
+ Path workDir = workDir();
+
+ String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+ @Language("HOCON")
+ String configTemplate = "ignite {\n"
+ + " \"network\": {\n"
+ + " \"port\":{},\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " storage.profiles: {"
+ + " " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+ + " " + DEFAULT_STORAGE_PROFILE + ".sizeBytes: " +
pageMemorySize() + " "
+ + " },\n"
+ + " clientConnector: { port:{} },\n"
+ + " clientConnector.sendServerExceptionStackTraceToClient:
true\n"
+ + " rest.port: {},\n"
+ + " raft.fsync = " + fsync() + ",\n"
+ + " system.partitionsLogPath = \"" + logPath() + "\",\n"
+ + " failureHandler.handler: {\n"
+ + " type: \"" +
StopNodeOrHaltFailureHandlerConfigurationSchema.TYPE + "\",\n"
+ + " tryStop: true,\n"
+ + " timeoutMillis: 60000,\n" // 1 minute for graceful
shutdown
+ + " },\n"
+ + "}";
+
+ for (int i = 0; i < nodes(); i++) {
+ int port = BASE_PORT + i;
+ String nodeName = nodeName(port);
+
+ String config = IgniteStringFormatter.format(configTemplate, port,
connectNodeAddr,
+ BASE_CLIENT_PORT + i, BASE_REST_PORT + i);
+
+
igniteServers.add(TestIgnitionManager.startWithProductionDefaults(nodeName,
config, workDir.resolve(nodeName)));
+ }
+
+ String metaStorageNodeName = nodeName(BASE_PORT);
+
+ InitParameters initParameters = InitParameters.builder()
+ .metaStorageNodeNames(metaStorageNodeName)
+ .clusterName("cluster")
+ .clusterConfiguration(clusterConfiguration())
+ .build();
+
+ TestIgnitionManager.init(igniteServers.get(0), initParameters);
+
+ for (IgniteServer node : igniteServers) {
+ assertThat(node.waitForInitAsync(), willCompleteSuccessfully());
+
+ if (publicIgnite == null) {
+ publicIgnite = node.api();
+ igniteImpl = unwrapIgniteImpl(publicIgnite);
+ }
+ }
+ }
+
+ @Nullable
+ protected String clusterConfiguration() {
+ return "ignite {}";
+ }
+
+ protected static String nodeName(int port) {
+ return "node_" + port;
+ }
+
+ protected Path workDir() throws Exception {
+ return new File("c:/work/tpcc").toPath();
+ }
+
+ protected int pageMemorySize() {
+ return 2073741824;
Review Comment:
```suggestion
return 2 * Constants.GiB;
```
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/Lock.java:
##########
@@ -72,6 +72,23 @@ public UUID txId() {
return txId;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Lock lock = (Lock) o;
+ return Objects.equals(lockKey, lock.lockKey) && lockMode ==
lock.lockMode && Objects.equals(txId, lock.txId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(lockKey, lockMode, txId);
Review Comment:
Please use explicit implementation. Lock hash is a performance-sensitive
thing, and `Objects.hash` introduces unintentional overhead while allocating
the vararg array.
##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedWaitDieDeadlockPreventionTest.java:
##########
@@ -77,10 +64,16 @@ public void youngNormalTxShouldBeAborted() {
var tx1 = beginTx(TxPriority.LOW);
var tx2 = beginTx(TxPriority.NORMAL);
- var key1 = key("test");
+ var key1 = lockKey("test");
assertThat(xlock(tx1, key1), willSucceedFast());
assertThat(xlock(tx2, key1), willThrow(LockException.class));
}
+
+ @Override
+ @Test
+ public void testNonFair() {
+ super.testNonFair();
+ }
Review Comment:
I don't think that I understand what's happening here. Is the test disabled
in a superclass? Maybe it should be commented
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WaitDieDeadlockPreventionPolicy.java:
##########
@@ -46,4 +47,19 @@ public Comparator<UUID> txIdComparator() {
public long waitTimeout() {
return 0;
}
+
+ @Override
+ public final Waiter allowWait(Waiter waiter, Waiter owner) {
+ int res = txIdComparator().compare(waiter.txId(), owner.txId());
Review Comment:
I have the same comment as before here, it also applies to
`WoundWaitDeadlockPreventionPolicy`. Does it make sense, what do you think?
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java:
##########
@@ -227,6 +232,11 @@ public void
retryImplicitTransactionsDueToReplicaMissTest() {
@Test
public void retryAfterLockFailureInSameTransaction() {
+ IgniteImpl ignite = node0();
+ if (!ignite.txManager().lockManager().policy().reverse()) {
+ return; // This test scenario is only applicable to reversed
policy.
+ }
Review Comment:
```suggestion
assumeFalse(ignite.txManager().lockManager().policy().reverse(), "
This test scenario is only applicable to reversed policy.");
```
##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java:
##########
@@ -150,4 +169,1220 @@ public void
testCompatibilityLockMapSizePropertyNameWasNotChanged() {
LOCK_MAP_SIZE_PROPERTY_NAME
);
}
+
+ @Test
+ public void testSingleKeyWrite() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, X);
+
+ assertTrue(fut0.isDone());
+
+ Collection<UUID> queue = lockManager.queue(key);
+
+ assertTrue(queue.size() == 1 && queue.iterator().next().equals(txId1));
+
+ Waiter waiter = lockManager.waiter(key, txId1);
+
+ assertTrue(waiter.locked());
+
+ lockManager.release(fut0.join());
+ }
+
+ @Test
+ public void testSingleKeyWriteLock() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId2, key, X);
+
+ assertTrue(fut0.isDone());
+
+ assertTrue(txId2.compareTo(txId1) > 0);
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, X);
+
+ assertFalse(fut1.isDone());
+
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertFalse(lockManager.waiter(key, txId1).locked());
+
+ lockManager.release(fut0.join());
+
+ assertTrue(fut1.isDone());
+
+ assertNull(lockManager.waiter(key, txId2));
+ assertTrue(lockManager.waiter(key, txId1).locked());
+
+ lockManager.release(fut1.join());
+
+ assertNull(lockManager.waiter(key, txId2));
+ assertNull(lockManager.waiter(key, txId1));
+ }
+
+ @Test
+ public void downgradeLockOutOfTurnTest() {
+ UUID txId0 = TestTransactionIds.newTransactionId();
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ lockManager.acquire(txId0, key, S).join();
+ Lock lock = lockManager.acquire(txId2, key, S).join();
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+ assertFalse(fut0.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, X);
+ expectConflict(fut2);
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
+ fut1.join();
+
+ assertFalse(fut0.isDone());
+
+ lockManager.release(lock);
+ fut0.thenAccept(l -> lockManager.release(l));
Review Comment:
We should probably wait for this future, and then assert that "everything's
alright". Test doesn't look complete
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java:
##########
@@ -88,6 +89,14 @@ public int hashCode() {
@Override
public String toString() {
- return S.toString(LockKey.class, this, "ctx", contextId, "key", key);
+ return S.toString(LockKey.class, this, "ctx", contextId, "key",
dump(key));
+ }
+
+ private static String dump(Object key) {
+ if (key instanceof ByteBuffer) {
+ return Arrays.toString(((ByteBuffer) key).array());
Review Comment:
We do have a
`org.apache.ignite.internal.util.IgniteUtils#byteBufferToByteArray` already,
but it allocates new array. Maybe we need to introduce a more efficient
`toString` variant in a separate issue. This code should reuse existing methods
to avoid copy-pasting the code.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+ private static final int BASE_PORT = 3344;
+ private static final int BASE_CLIENT_PORT = 10800;
+ private static final int BASE_REST_PORT = 10300;
+
+ private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+ protected static Ignite publicIgnite;
+ protected static IgniteImpl igniteImpl;
+
+ public static void main(String[] args) throws Exception {
+ TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
+ runner.startCluster();
+ }
+
+ public IgniteImpl node(int idx) {
+ return unwrapIgniteImpl(igniteServers.get(idx).api());
+ }
+
+ private void startCluster() throws Exception {
+ Path workDir = workDir();
+
+ String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+ @Language("HOCON")
+ String configTemplate = "ignite {\n"
+ + " \"network\": {\n"
+ + " \"port\":{},\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " storage.profiles: {"
+ + " " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+ + " " + DEFAULT_STORAGE_PROFILE + ".sizeBytes: " +
pageMemorySize() + " "
+ + " },\n"
+ + " clientConnector: { port:{} },\n"
+ + " clientConnector.sendServerExceptionStackTraceToClient:
true\n"
+ + " rest.port: {},\n"
+ + " raft.fsync = " + fsync() + ",\n"
+ + " system.partitionsLogPath = \"" + logPath() + "\",\n"
+ + " failureHandler.handler: {\n"
+ + " type: \"" +
StopNodeOrHaltFailureHandlerConfigurationSchema.TYPE + "\",\n"
+ + " tryStop: true,\n"
+ + " timeoutMillis: 60000,\n" // 1 minute for graceful
shutdown
+ + " },\n"
+ + "}";
+
+ for (int i = 0; i < nodes(); i++) {
+ int port = BASE_PORT + i;
+ String nodeName = nodeName(port);
+
+ String config = IgniteStringFormatter.format(configTemplate, port,
connectNodeAddr,
+ BASE_CLIENT_PORT + i, BASE_REST_PORT + i);
+
+
igniteServers.add(TestIgnitionManager.startWithProductionDefaults(nodeName,
config, workDir.resolve(nodeName)));
+ }
+
+ String metaStorageNodeName = nodeName(BASE_PORT);
+
+ InitParameters initParameters = InitParameters.builder()
+ .metaStorageNodeNames(metaStorageNodeName)
+ .clusterName("cluster")
+ .clusterConfiguration(clusterConfiguration())
+ .build();
+
+ TestIgnitionManager.init(igniteServers.get(0), initParameters);
+
+ for (IgniteServer node : igniteServers) {
+ assertThat(node.waitForInitAsync(), willCompleteSuccessfully());
+
+ if (publicIgnite == null) {
+ publicIgnite = node.api();
+ igniteImpl = unwrapIgniteImpl(publicIgnite);
+ }
+ }
+ }
+
+ @Nullable
+ protected String clusterConfiguration() {
+ return "ignite {}";
+ }
+
+ protected static String nodeName(int port) {
+ return "node_" + port;
+ }
+
+ protected Path workDir() throws Exception {
+ return new File("c:/work/tpcc").toPath();
Review Comment:
Yes, this part clearly must be fixed
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Predicate;
+import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+ /** Hint for maximum concurrent txns. */
+ private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+ /** Field updater for inflights. */
+ private static final AtomicLongFieldUpdater<CleanupContext> UPDATER =
newUpdater(CleanupContext.class, "inflights");
Review Comment:
Name of this field should reflect the name of the field that it updates. I
recommend to rename it to `INFLIGHTS_UPDATER`, for example
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -357,16 +398,52 @@ private LockState adjustLockState(LockState state,
LockState v) {
}
}
- private void track(UUID txId, Releasable val) {
+ private void seal(UUID txId) {
+ txMap.compute(txId, (k, v) -> {
+ if (v == null) {
+ return null;
+ }
+
+ v.sealed = true;
+
+ return v;
+ });
+ }
+
+ private boolean sealed(UUID txId) {
+ boolean[] ret = {false};
+ txMap.compute(txId, (k, v) -> {
+ if (v == null) {
+ return null;
+ }
+
+ if (v.sealed) {
+ ret[0] = true;
+ }
+
+ return v;
+ });
Review Comment:
Is it possible to avoid "writing" into a concurrent map on read operations?
I believe that a simple "get" should be enough here if we make `v.sealed`
volatile, this code will become non-blocking then (I think). Certainly more
efficient
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Predicate;
+import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+ /** Hint for maximum concurrent txns. */
+ private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+ /** Field updater for inflights. */
+ private static final AtomicLongFieldUpdater<CleanupContext> UPDATER =
newUpdater(CleanupContext.class, "inflights");
+
+ /** Txn contexts. */
+ private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
+
+ /**
+ * Registers the inflight for a transaction.
+ *
+ * @param txId The transaction id.
+ * @param testPred Test predicate.
+ * @param requestType Request type.
+ *
+ * @return Cleanup context.
+ */
+ @Nullable CleanupContext addInflight(UUID txId, Predicate<UUID> testPred,
RequestType requestType) {
+ boolean[] res = {true};
+
+ CleanupContext ctx0 = txCtxMap.compute(txId, (uuid, ctx) -> {
+ if (ctx == null) {
+ ctx = new CleanupContext();
+ }
+
+ if (ctx.finishFut != null || testPred.test(txId)) {
+ res[0] = false;
+ } else {
+ UPDATER.incrementAndGet(ctx);
+ if (requestType.isWrite()) {
+ ctx.hasWrites = true;
+ }
+ }
+
+ return ctx;
+ });
+
+ return res[0] ? ctx0 : null;
+ }
+
+ /**
+ * Runs a closure under a transaction lock.
+ *
+ * @param txId Transaction id.
+ * @param r Runnable.
+ */
+ public void runClosure(UUID txId, Runnable r) {
+ txCtxMap.compute(txId, (uuid, ctx) -> {
+ r.run();
+
+ return ctx;
+ });
+ }
+
+ /**
+ * Unregisters the inflight for a transaction.
+ *
+ * @param ctx Cleanup context.
+ */
+ static void removeInflight(CleanupContext ctx) {
+ long val = UPDATER.decrementAndGet(ctx);
+
+ if (ctx.finishFut != null && val == 0) {
+ ctx.finishFut.complete(null);
+ }
+ }
+
+ /**
+ * Get finish future.
Review Comment:
It's not just `get`, it changes internal state, as far as I see
##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java:
##########
@@ -150,4 +169,1220 @@ public void
testCompatibilityLockMapSizePropertyNameWasNotChanged() {
LOCK_MAP_SIZE_PROPERTY_NAME
);
}
+
+ @Test
+ public void testSingleKeyWrite() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, X);
+
+ assertTrue(fut0.isDone());
+
+ Collection<UUID> queue = lockManager.queue(key);
+
+ assertTrue(queue.size() == 1 && queue.iterator().next().equals(txId1));
+
+ Waiter waiter = lockManager.waiter(key, txId1);
+
+ assertTrue(waiter.locked());
+
+ lockManager.release(fut0.join());
+ }
+
+ @Test
+ public void testSingleKeyWriteLock() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId2, key, X);
+
+ assertTrue(fut0.isDone());
+
+ assertTrue(txId2.compareTo(txId1) > 0);
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, X);
+
+ assertFalse(fut1.isDone());
+
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertFalse(lockManager.waiter(key, txId1).locked());
+
+ lockManager.release(fut0.join());
+
+ assertTrue(fut1.isDone());
+
+ assertNull(lockManager.waiter(key, txId2));
+ assertTrue(lockManager.waiter(key, txId1).locked());
+
+ lockManager.release(fut1.join());
+
+ assertNull(lockManager.waiter(key, txId2));
+ assertNull(lockManager.waiter(key, txId1));
+ }
+
+ @Test
+ public void downgradeLockOutOfTurnTest() {
+ UUID txId0 = TestTransactionIds.newTransactionId();
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ lockManager.acquire(txId0, key, S).join();
+ Lock lock = lockManager.acquire(txId2, key, S).join();
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+ assertFalse(fut0.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, X);
+ expectConflict(fut2);
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
+ fut1.join();
+
+ assertFalse(fut0.isDone());
+
+ lockManager.release(lock);
+ fut0.thenAccept(l -> lockManager.release(l));
+ }
+
+ @Test
+ public void upgradeLockImmediatelyTest() {
+ UUID txId0 = TestTransactionIds.newTransactionId();
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+
+ LockKey key = lockKey();
+
+ CompletableFuture<Lock> fut = lockManager.acquire(txId0, key, IS);
+ assertTrue(fut.isDone());
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, IS);
+ assertTrue(fut0.isDone());
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId2, key, IS);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, key, IX);
+ assertTrue(fut2.isDone());
+
+ lockManager.release(fut1.join());
+ }
+
+ @Test
+ public void testSingleKeyReadWriteLock() {
+ UUID txId0 = TestTransactionIds.newTransactionId();
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+ UUID txId3 = TestTransactionIds.newTransactionId();
+ assertTrue(txId3.compareTo(txId2) > 0);
+ assertTrue(txId2.compareTo(txId1) > 0);
+ assertTrue(txId1.compareTo(txId0) > 0);
+ LockKey key = lockKey();
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(txId3, key, S);
+ assertTrue(fut3.isDone());
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, S);
+ assertTrue(fut2.isDone());
+
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+ assertFalse(fut0.isDone());
+
+ assertTrue(lockManager.waiter(key, txId3).locked());
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertTrue(lockManager.waiter(key, txId1).locked());
+ assertFalse(lockManager.waiter(key, txId0).locked());
+
+ lockManager.release(fut1.join());
+
+ assertTrue(lockManager.waiter(key, txId3).locked());
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertNull(lockManager.waiter(key, txId1));
+ assertFalse(lockManager.waiter(key, txId0).locked());
+
+ lockManager.release(fut3.join());
+
+ assertNull(lockManager.waiter(key, txId3));
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertNull(lockManager.waiter(key, txId1));
+ assertFalse(lockManager.waiter(key, txId0).locked());
+
+ lockManager.release(fut2.join());
+
+ assertNull(lockManager.waiter(key, txId3));
+ assertNull(lockManager.waiter(key, txId2));
+ assertNull(lockManager.waiter(key, txId1));
+ assertTrue(lockManager.waiter(key, txId0).locked());
+ }
+
+ @Test
+ public void testSingleKeyReadWriteConflict() {
+ UUID txId0 = TestTransactionIds.newTransactionId();
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ LockKey key = lockKey();
+
+ // Lock in order
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, S);
+ assertTrue(fut0.isDone());
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId0, key, X);
+ assertFalse(fut1.isDone());
+
+ lockManager.release(fut0.join());
+ assertTrue(fut1.isDone());
+
+ lockManager.release(fut1.join());
+
+ assertTrue(lockManager.queue(key).isEmpty());
+
+ // Lock not in order
+ fut0 = lockManager.acquire(txId0, key, S);
+ assertTrue(fut0.isDone());
+
+ try {
+ lockManager.acquire(txId1, key, X).join();
+
+ fail();
+ } catch (CompletionException e) {
+ // Expected.
+ }
Review Comment:
With a static import, of course, I mentioned class name for comment's
clarity only
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Predicate;
+import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+ /** Hint for maximum concurrent txns. */
+ private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+ /** Field updater for inflights. */
+ private static final AtomicLongFieldUpdater<CleanupContext> UPDATER =
newUpdater(CleanupContext.class, "inflights");
+
+ /** Txn contexts. */
+ private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
+
+ /**
+ * Registers the inflight for a transaction.
+ *
+ * @param txId The transaction id.
+ * @param testPred Test predicate.
+ * @param requestType Request type.
+ *
+ * @return Cleanup context.
+ */
+ @Nullable CleanupContext addInflight(UUID txId, Predicate<UUID> testPred,
RequestType requestType) {
+ boolean[] res = {true};
+
+ CleanupContext ctx0 = txCtxMap.compute(txId, (uuid, ctx) -> {
+ if (ctx == null) {
+ ctx = new CleanupContext();
+ }
+
+ if (ctx.finishFut != null || testPred.test(txId)) {
+ res[0] = false;
+ } else {
+ UPDATER.incrementAndGet(ctx);
+ if (requestType.isWrite()) {
+ ctx.hasWrites = true;
+ }
+ }
+
+ return ctx;
+ });
+
+ return res[0] ? ctx0 : null;
+ }
+
+ /**
+ * Runs a closure under a transaction lock.
+ *
+ * @param txId Transaction id.
+ * @param r Runnable.
+ */
+ public void runClosure(UUID txId, Runnable r) {
+ txCtxMap.compute(txId, (uuid, ctx) -> {
Review Comment:
It synchronizes the entire bucket, not just a single key. Should we make a
load factor smaller, assuming you're planning to use this method frequently, or
maybe consider a more fine-grained manual locking?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Predicate;
+import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+ /** Hint for maximum concurrent txns. */
+ private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+ /** Field updater for inflights. */
+ private static final AtomicLongFieldUpdater<CleanupContext> UPDATER =
newUpdater(CleanupContext.class, "inflights");
+
+ /** Txn contexts. */
+ private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
+
+ /**
+ * Registers the inflight for a transaction.
+ *
+ * @param txId The transaction id.
+ * @param testPred Test predicate.
+ * @param requestType Request type.
+ *
+ * @return Cleanup context.
+ */
+ @Nullable CleanupContext addInflight(UUID txId, Predicate<UUID> testPred,
RequestType requestType) {
+ boolean[] res = {true};
+
+ CleanupContext ctx0 = txCtxMap.compute(txId, (uuid, ctx) -> {
+ if (ctx == null) {
+ ctx = new CleanupContext();
+ }
+
+ if (ctx.finishFut != null || testPred.test(txId)) {
+ res[0] = false;
+ } else {
+ UPDATER.incrementAndGet(ctx);
+ if (requestType.isWrite()) {
+ ctx.hasWrites = true;
+ }
+ }
+
+ return ctx;
+ });
+
+ return res[0] ? ctx0 : null;
+ }
+
+ /**
+ * Runs a closure under a transaction lock.
+ *
+ * @param txId Transaction id.
+ * @param r Runnable.
+ */
+ public void runClosure(UUID txId, Runnable r) {
+ txCtxMap.compute(txId, (uuid, ctx) -> {
+ r.run();
+
+ return ctx;
+ });
+ }
+
+ /**
+ * Unregisters the inflight for a transaction.
+ *
+ * @param ctx Cleanup context.
+ */
+ static void removeInflight(CleanupContext ctx) {
+ long val = UPDATER.decrementAndGet(ctx);
+
+ if (ctx.finishFut != null && val == 0) {
+ ctx.finishFut.complete(null);
+ }
+ }
+
+ /**
+ * Get finish future.
+ *
+ * @param txId Transaction id.
+ * @return The future.
+ */
+ public @Nullable CleanupContext finishFuture(UUID txId) {
+ return txCtxMap.compute(txId, (uuid, ctx) -> {
+ if (ctx == null) {
+ return null;
+ }
+
+ if (ctx.finishFut == null) {
+ ctx.finishFut = UPDATER.get(ctx) == 0 ? nullCompletedFuture()
: new CompletableFuture<>();
+ }
+
+ // Avoiding a data race with a concurrent decrementing thread,
which might not see finishFut publication.
+ if (UPDATER.get(ctx) == 0 && !ctx.finishFut.isDone()) {
+ ctx.finishFut = nullCompletedFuture();
+ }
Review Comment:
In this case we're ignoring the `new CompletableFuture<>()` that we just
allocated, right? Is there a chance of this future leaking outside of this
method?
##########
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -317,6 +317,14 @@ void
testManualRebalanceIfMajorityIsLostSpecifyPartitions() throws Exception {
int anotherPartId = 0;
IgniteImpl node0 = igniteImpl(0);
+
+ if (!node0.txManager().lockManager().policy().reverse()) {
Review Comment:
I recommend always using `assume*` statements in such situations, as already
mentioned in my previous comments
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/NoWaitDeadlockPreventionPolicy.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.Comparator;
+import java.util.UUID;
+import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
+import org.apache.ignite.internal.tx.Waiter;
+
+/**
+ * Nowait deadlock prevention policy.
+ */
+public class NoWaitDeadlockPreventionPolicy implements
DeadlockPreventionPolicy {
+ /** {@inheritDoc} */
Review Comment:
These comments are not necessary anymore
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -861,35 +949,114 @@ IgniteBiTuple<CompletableFuture<Void>, LockMode>
tryAcquire(UUID txId, LockMode
}
}
- if (!isWaiterReadyToNotify(waiter, false)) {
- if (deadlockPreventionPolicy.waitTimeout() > 0) {
+ notifications = tryAcquireInternal(waiter, prev == null,
false);
+ }
+
+ // Callback outside the monitor.
+ for (Runnable r : notifications) {
+ r.run();
+ }
+
+ return new IgniteBiTuple<>(waiter.fut, waiter.lockMode());
+ }
+
+ private void failWaiter(WaiterImpl waiter, List<Runnable>
notifications, Exception exception) {
+ if (!waiter.locked()) {
+ waiters.remove(waiter.txId());
+ } else if (waiter.hasLockIntent()) {
+ waiter.refuseIntent(); // Reset lock intention.
+ }
+ waiter.fail(exception);
+ notifications.add(waiter::notifyLocked);
+ }
+
+ private List<Runnable> tryAcquireInternal(WaiterImpl waiter, boolean
track, boolean unlock) {
+ List<Runnable> notifications = new ArrayList<>();
+
+ if (sealed(waiter.txId)) {
+ failWaiter(waiter, notifications,
resolveTransactionSealedException(waiter.txId));
+ return notifications;
+ }
+
+ boolean[] needWait = {false};
+ boolean[] notified = {false};
+
+ findConflicts(waiter, owner -> {
+ assert !waiter.txId.equals(owner.txId);
+ @Nullable WaiterImpl toFail = (WaiterImpl)
deadlockPreventionPolicy.allowWait(waiter, owner);
Review Comment:
Is the cast to `WaiterImpl` necessary? I don't think so, could you please
explain this decision?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionIds.java:
##########
@@ -86,4 +86,12 @@ private static long combine(int nodeId, TxPriority priority)
{
// Shift the int 32 bits and combine with the boolean
return ((long) nodeId << 32) | priorityAsInt;
}
+
+ public static int hash(UUID txId, int divisor) {
+ return Math.floorMod(spread(txId.hashCode()), divisor);
Review Comment:
`spread` is already positive, why don't we use `%` here?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReversedWaitDieDeadlockPreventionPolicy.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.Comparator;
+import java.util.UUID;
+import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
+import org.apache.ignite.internal.tx.Waiter;
+
+/**
+ * Reversed wait die implementation. Same as wait die, but reverses the wait
order: younger is allowed to wait for older, older is rejected
+ * if conflicts with younger.
+ */
+public class ReversedWaitDieDeadlockPreventionPolicy implements
DeadlockPreventionPolicy {
+ private static final TxIdPriorityComparator TX_ID_PRIORITY_COMPARATOR =
new TxIdPriorityComparator();
+
+ /** {@inheritDoc} */
+ @Override
+ public final Comparator<UUID> txIdComparator() {
+ return TX_ID_PRIORITY_COMPARATOR;
+ }
+
+ @Override
+ public Waiter allowWait(Waiter waiter, Waiter owner) {
+ int res = txIdComparator().compare(waiter.txId(), owner.txId());
Review Comment:
I'm not totally convinced that my comment makes sense, but I would rather
see the
```
int res = TxIdPriorityComparator.compare(waiter.txId(), owner.txId());
...
```
with a static call. Comparison is a potentially hot operation, and here we
have too many virtual calls when all we need is, for the most part, to compare
a few numeric values.
`TxIdPriorityComparator` itself doesn't look optimized either, we should do
something about it too. In a separate Jira, I presume.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]