This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b856aad93a7 IGNITE-25978 Optimize implicit RO transaction flow
b856aad93a7 is described below
commit b856aad93a789cdbcbacb41b144173d1c012cf14
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Tue Jul 29 17:43:45 2025 +0300
IGNITE-25978 Optimize implicit RO transaction flow
---
.../handler/requests/table/ClientTableCommon.java | 17 +-
modules/runner/build.gradle | 10 -
.../benchmark/AbstractMultiNodeBenchmark.java | 11 +-
.../internal/benchmark/ClientKvBenchmark.java | 17 +-
.../internal/benchmark/RemoteKvBenchmark.java | 51 -----
.../internal/table/ItReadOnlyTransactionTest.java | 3 +-
.../ignite/internal/table/AbstractTableView.java | 6 +-
.../internal/table/distributed/TableUtils.java | 2 +-
.../replicator/PartitionReplicaListener.java | 24 ++-
.../distributed/storage/InternalTableImpl.java | 24 +--
.../ignite/internal/table/TxAbstractTest.java | 4 +-
.../internal/tx/ItTransactionMetricsTest.java | 4 +-
...l.java => ReadOnlyImplicitTransactionImpl.java} | 133 ++++++-------
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 6 +-
.../internal/tx/impl/ReadWriteTransactionImpl.java | 10 +
.../ignite/internal/tx/impl/TxManagerImpl.java | 208 +++++++++++----------
.../apache/ignite/internal/tx/TxManagerTest.java | 14 ++
.../tx/impl/ReadOnlyTransactionImplTest.java | 1 -
18 files changed, 261 insertions(+), 284 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 3732906dc04..1c6630d1135 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -471,7 +471,7 @@ public class ClientTableCommon {
if (tx == null) {
// Implicit transactions do not use an observation timestamp
because RW never depends on it, and implicit RO is always direct.
// The direct transaction uses a current timestamp on the primary
replica by definition.
- tx = startImplicitTx(readTs, txManager, null, readOnly);
+ tx = startImplicitTx(readTs, txManager, readOnly);
}
return tx;
@@ -494,23 +494,20 @@ public class ClientTableCommon {
boolean readOnly,
InternalTxOptions options
) {
- tsTracker.update(currentTs);
+ if (readOnly) {
+ tsTracker.update(currentTs);
- return txManager.beginExplicit(
- tsTracker,
- readOnly,
- options
- );
+ return txManager.beginExplicitRo(tsTracker, options);
+ } else {
+ return txManager.beginExplicitRw(tsTracker, options);
+ }
}
private static InternalTransaction startImplicitTx(
HybridTimestampTracker tsTracker,
TxManager txManager,
- @Nullable HybridTimestamp currentTs,
boolean readOnly
) {
- tsTracker.update(currentTs);
-
return txManager.beginImplicit(tsTracker, readOnly);
}
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index e1f20d4fe06..6411441821d 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -283,16 +283,6 @@ tasks.register('runClientGetBenchmark', JavaExec) {
enableAssertions = true
}
-tasks.register('runRemoteBenchmark', JavaExec) {
- mainClass = 'org.apache.ignite.internal.benchmark.RemoteKvBenchmark'
-
- jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true',
'-Xmx16g']
-
- classpath = sourceSets.integrationTest.runtimeClasspath
-
- enableAssertions = true
-}
-
jar {
manifest {
attributes(
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index ee653459bf1..eef5c815306 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -73,6 +73,9 @@ public class AbstractMultiNodeBenchmark {
protected static Ignite publicIgnite;
protected static IgniteImpl igniteImpl;
+ @Param({"false"})
+ protected boolean remote;
+
@Param({"false"})
private boolean fsync;
@@ -87,7 +90,7 @@ public class AbstractMultiNodeBenchmark {
@Setup
public void nodeSetUp() throws Exception {
System.setProperty("jraft.available_processors", "2");
- if (!remote()) {
+ if (!remote) {
startCluster();
}
@@ -192,7 +195,7 @@ public class AbstractMultiNodeBenchmark {
}
private void startCluster() throws Exception {
- if (remote()) {
+ if (remote) {
throw new AssertionError("Can't start the cluster in remote mode");
}
@@ -292,8 +295,4 @@ public class AbstractMultiNodeBenchmark {
protected int replicaCount() {
return CatalogUtils.DEFAULT_REPLICA_COUNT;
}
-
- protected boolean remote() {
- return false;
- }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
index e3fb22ba783..ded0f63f34d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
@@ -65,9 +65,6 @@ public abstract class ClientKvBenchmark extends
AbstractMultiNodeBenchmark {
@Param({"0"})
protected int offset; // 1073741824 for second client to ensure unique keys
- @Param({"false"})
- private boolean fsync;
-
@Param({"32"})
private int partitionCount;
@@ -90,8 +87,13 @@ public abstract class ClientKvBenchmark extends
AbstractMultiNodeBenchmark {
@Override
public void nodeSetUp() throws Exception {
-
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK,
"false");
-
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
"false");
+ if (remote) {
+ client = IgniteClient.builder().addresses(addresses()).build();
+ publicIgnite = client;
+ } else {
+
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK,
"false");
+
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
"false");
+ }
super.nodeSetUp();
}
@@ -153,11 +155,6 @@ public abstract class ClientKvBenchmark extends
AbstractMultiNodeBenchmark {
new Runner(builder.build()).run();
}
- @Override
- protected boolean fsync() {
- return fsync;
- }
-
@Override
protected int nodes() {
return 1;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
deleted file mode 100644
index 8f3cae627d3..00000000000
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.benchmark;
-
-import org.apache.ignite.client.IgniteClient;
-import org.openjdk.jmh.runner.RunnerException;
-
-/**
- * Benchmark for a single upsert operation using externally enabled cluster.
- */
-public class RemoteKvBenchmark extends ClientKvBenchmark {
- @Override
- protected boolean remote() {
- return true;
- }
-
- @Override
- protected String[] addresses() {
- return new String[]{};
- }
-
- @Override
- public void nodeSetUp() throws Exception {
- client = IgniteClient.builder().addresses(addresses()).build();
- publicIgnite = client;
-
- super.nodeSetUp();
- }
-
- /**
- * Benchmark's entry point.
- */
- public static void main(String[] args) throws RunnerException {
- runBenchmark(RemoteKvBenchmark.class, args);
- }
-}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
index 00501cb5fa7..183b7a6c4b6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
@@ -123,7 +123,8 @@ public class ItReadOnlyTransactionTest extends
ClusterPerClassIntegrationTest {
// but we check that the new transaction does not appear.
assertFalse(txRwStatesAfter > txRwStatesBefore, "RW transaction
was stated unexpectedly.");
- assertEquals(2, txFinishedAfter - txFinishedBefore, format(
+ // Implicit RO operations are not counted as transactions.
+ assertEquals(0, txFinishedAfter - txFinishedBefore, format(
"Unexpected finished transaction quantity [i={},
beforeOp={}, afterOp={}]",
i,
txFinishedBefore,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 916ba0259f5..f67fc3af5f4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -21,7 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
import static
org.apache.ignite.internal.table.criteria.CriteriaExceptionMapperUtil.mapToPublicCriteriaException;
-import static
org.apache.ignite.internal.table.distributed.TableUtils.isDirectFlowApplicableTx;
+import static
org.apache.ignite.internal.table.distributed.TableUtils.isDirectFlowApplicable;
import static org.apache.ignite.internal.util.ExceptionUtils.isOrCausedBy;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.ViewUtils.sync;
@@ -139,7 +139,7 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
@Nullable Integer previousSchemaVersion,
KvAction<T> action
) {
- CompletableFuture<Integer> schemaVersionFuture =
isDirectFlowApplicableTx(tx)
+ CompletableFuture<Integer> schemaVersionFuture = tx == null
? schemaVersions.schemaVersionAtCurrentTime(tbl.tableId())
: schemaVersions.schemaVersionAt(tx.schemaTimestamp(),
tbl.tableId());
@@ -157,7 +157,7 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
// version corresponding to the transaction
creation moment, so this mismatch is not tolerable: we need
// to retry the operation here.
- assert isDirectFlowApplicableTx(tx) : "Only
for direct flow applicable tx a retry might be requested";
+ assert isDirectFlowApplicable(tx) : "Only for
direct flow applicable tx a retry might be requested";
assertSchemaVersionIncreased(previousSchemaVersion, schemaVersion);
// Repeat.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
index 6e4502a3d7f..8f3cd2d2b06 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
@@ -114,7 +114,7 @@ public class TableUtils {
* @param tx Transaction of {@code null}.
* @return True of direct flow is applicable for the transaction.
*/
- public static boolean isDirectFlowApplicableTx(@Nullable
InternalTransaction tx) {
+ public static boolean isDirectFlowApplicable(@Nullable InternalTransaction
tx) {
// TODO https://issues.apache.org/jira/browse/IGNITE-24218 Remove this
method ot use tx == null || tx.direct() instead.
return tx == null || (tx.implicit() && tx.isReadOnly());
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 8db22fcc6eb..b08c54b7548 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -56,6 +56,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Replicator.CURSOR_CLOSE_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -2019,20 +2020,29 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
ReadOnlyDirectMultiRowReplicaRequest request,
HybridTimestamp opStartTimestamp) {
List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
- HybridTimestamp readTimestamp = opStartTimestamp;
- if (request.requestType() != RO_GET_ALL) {
- throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- format("Unknown single request [actionType={}]",
request.requestType()));
- }
+ assert request.requestType() == RO_GET_ALL;
CompletableFuture<BinaryRow>[] resolutionFuts = new
CompletableFuture[primaryKeys.size()];
for (int i = 0; i < primaryKeys.size(); i++) {
- resolutionFuts[i] = resolveRowByPkForReadOnly(primaryKeys.get(i),
readTimestamp);
+ resolutionFuts[i] = resolveRowByPkForReadOnly(primaryKeys.get(i),
opStartTimestamp);
}
- return allOfToList(resolutionFuts);
+ return allOfToList(resolutionFuts).thenApply(rows -> {
+ // Validate read correctness.
+ HybridTimestamp lwm = lowWatermark.getLowWatermark();
+
+ if (lwm != null && opStartTimestamp.compareTo(lwm) < 0) {
+ throw new IgniteInternalException(
+ TX_READ_ONLY_TOO_OLD_ERR,
+ "Attempted to read data below the garbage collection
watermark: [readTimestamp={}, gcTimestamp={}]",
+ opStartTimestamp,
+ lowWatermark.getLowWatermark());
+ }
+
+ return rows;
+ });
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 980f6211799..7a42adea027 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -42,7 +42,7 @@ import static
org.apache.ignite.internal.partition.replicator.network.replicatio
import static
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT;
import static
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT_ALL;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
-import static
org.apache.ignite.internal.table.distributed.TableUtils.isDirectFlowApplicableTx;
+import static
org.apache.ignite.internal.table.distributed.TableUtils.isDirectFlowApplicable;
import static
org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
import static
org.apache.ignite.internal.util.CompletableFutures.completedOrFailedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
@@ -760,7 +760,7 @@ public class InternalTableImpl implements InternalTable {
* @param <T> Operation return type.
* @return The future.
*/
- private <T> CompletableFuture<T> postEnlist(
+ private static <T> CompletableFuture<T> postEnlist(
CompletableFuture<T> fut, boolean autoCommit, InternalTransaction
tx0, boolean full
) {
assert !(autoCommit && full) : "Invalid combination of flags";
@@ -796,7 +796,7 @@ public class InternalTableImpl implements InternalTable {
}
/**
- * Evaluates the single-row request to the cluster for a read-only
single-partition transaction.
+ * Evaluates the single-row request to the cluster for a read-only
single-partition implicit transaction.
*
* @param tx Transaction or {@code null} if it is not started yet.
* @param row Binary row.
@@ -819,20 +819,18 @@ public class InternalTableImpl implements InternalTable {
}
/**
- * Evaluates the multi-row request to the cluster for a read-only
single-partition transaction.
+ * Evaluates the multi-row request to the cluster for a read-only
single-partition implicit transaction.
*
- * @param tx Transaction or {@code null} if it is not started yet.
* @param rows Rows.
* @param op Replica requests factory.
* @param <R> Result type.
* @return The future.
*/
private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode(
- @Nullable InternalTransaction tx,
Collection<BinaryRowEx> rows,
BiFunction<ReplicationGroupId, Long, ReplicaRequest> op
) {
- InternalTransaction actualTx = startImplicitRoTxIfNeeded(tx);
+ InternalTransaction actualTx =
txManager.beginImplicitRo(observableTimestampTracker);
int partId = partitionId(rows.iterator().next());
@@ -902,12 +900,13 @@ public class InternalTableImpl implements InternalTable {
return tx.finish(false, clockService.current(), false, false)
.handle((ignored, err) -> {
if (err != null) {
+ // Preserve failed state.
e.addSuppressed(err);
}
sneakyThrow(e);
return null;
- }); // Preserve failed state.
+ });
}
return tx.finish(true, clockService.current(), false,
false).thenApply(ignored -> r);
@@ -919,7 +918,7 @@ public class InternalTableImpl implements InternalTable {
public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable
InternalTransaction tx) {
checkTransactionFinishStarted(tx);
- if (isDirectFlowApplicableTx(tx)) {
+ if (isDirectFlowApplicable(tx)) {
return evaluateReadOnlyPrimaryNode(
tx,
keyRow,
@@ -1014,9 +1013,8 @@ public class InternalTableImpl implements InternalTable {
return emptyListCompletedFuture();
}
- if (isDirectFlowApplicableTx(tx) && isSinglePartitionBatch(keyRows)) {
+ if (tx == null && isSinglePartitionBatch(keyRows)) {
return evaluateReadOnlyPrimaryNode(
- tx,
keyRows,
(groupId, consistencyToken) ->
TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
.groupId(serializeReplicationGroupId(groupId))
@@ -1030,6 +1028,8 @@ public class InternalTableImpl implements InternalTable {
}
if (tx != null && tx.isReadOnly()) {
+ assert !tx.implicit() : "implicit RO getAll not supported";
+
BinaryRowEx firstRow = keyRows.iterator().next();
return evaluateReadOnlyRecipientNode(partitionId(firstRow),
tx.readTimestamp())
@@ -2382,7 +2382,7 @@ public class InternalTableImpl implements InternalTable {
return replicaMeta.getStartTime().longValue();
}
- private void checkTransactionFinishStarted(@Nullable InternalTransaction
transaction) {
+ private static void checkTransactionFinishStarted(@Nullable
InternalTransaction transaction) {
if (transaction != null && transaction.isFinishingOrFinished()) {
boolean isFinishedDueToTimeout =
transaction.isRolledBackWithTimeoutExceeded();
throw new TransactionException(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 8743aa60df0..58f3da7f5c4 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -458,7 +458,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertEquals(BALANCE_1 - DELTA, view.get(null,
makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, view.get(null,
makeKey(2)).doubleValue("balance"));
- assertEquals(5, clientTxManager().finished());
+ assertEquals(3, clientTxManager().finished());
assertEquals(0, clientTxManager().pending());
}
@@ -483,7 +483,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertEquals(BALANCE_1 - DELTA, accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(null,
makeKey(2)).doubleValue("balance"));
- assertEquals(5, clientTxManager().finished());
+ assertEquals(3, clientTxManager().finished());
assertEquals(0, clientTxManager().pending());
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
index 82f90c45a02..a5923f777e1 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
@@ -157,8 +157,8 @@ public class ItTransactionMetricsTest extends
ClusterPerClassIntegrationTest {
// Check that all transaction metrics ere not changed except
TotalCommits and RoCommits.
testMetricValues(metrics0, actualMetrics0, "TotalCommits",
"RoCommits");
- assertThat(actualMetrics0.get("TotalCommits"),
is(metrics0.get("TotalCommits") + 1));
- assertThat(actualMetrics0.get("RoCommits"),
is(metrics0.get("RoCommits") + 1));
+ assertThat(actualMetrics0.get("TotalCommits"),
is(metrics0.get("TotalCommits") + (implicit ? 0 : 1)));
+ assertThat(actualMetrics0.get("RoCommits"),
is(metrics0.get("RoCommits") + (implicit ? 0 : 1)));
}
/**
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyImplicitTransactionImpl.java
similarity index 51%
copy from
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyImplicitTransactionImpl.java
index cd3705215a5..0833cd3be94 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyImplicitTransactionImpl.java
@@ -18,56 +18,78 @@
package org.apache.ignite.internal.tx.impl;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
/**
- * The read-only implementation of an internal transaction.
+ * The special lightweight implementation for read-only implicit transaction.
*/
-public class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
- /** The read timestamp. */
- private final HybridTimestamp readTimestamp;
+public class ReadOnlyImplicitTransactionImpl implements InternalTransaction {
+ private static final UUID FAKE_ID = new UUID(0, 0);
- /** Prevents double finish of the transaction. */
- private final AtomicBoolean finishGuard = new AtomicBoolean();
+ private final HybridTimestampTracker observableTsTracker;
- /** Transaction future. */
- private final CompletableFuture<Void> txFuture;
+ private final HybridTimestamp createTs;
/**
* The constructor.
*
- * @param txManager The tx manager.
* @param observableTsTracker Observable timestamp tracker.
- * @param id The id.
- * @param txCoordinatorId Transaction coordinator inconsistent ID.
- * @param implicit True for an implicit transaction, false for an ordinary
one.
- * @param timeout The timeout.
- * @param readTimestamp The read timestamp.
+ * @param createTs Create timestamp.
*/
- ReadOnlyTransactionImpl(
- TxManagerImpl txManager,
- HybridTimestampTracker observableTsTracker,
- UUID id,
- UUID txCoordinatorId,
- boolean implicit,
- long timeout,
- HybridTimestamp readTimestamp,
- CompletableFuture<Void> txFuture
- ) {
- super(txManager, observableTsTracker, id, txCoordinatorId, implicit,
timeout);
+ ReadOnlyImplicitTransactionImpl(HybridTimestampTracker
observableTsTracker, HybridTimestamp createTs) {
+ this.observableTsTracker = observableTsTracker;
+ this.createTs = createTs;
+ }
+
+ @Override
+ public UUID id() {
+ return FAKE_ID;
+ }
+
+ @Override
+ public TxState state() {
+ return null;
+ }
+
+ @Override
+ public UUID coordinatorId() {
+ return null;
+ }
+
+ @Override
+ public boolean implicit() {
+ return true;
+ }
+
+ @Override
+ public boolean remote() {
+ return false;
+ }
+
+ @Override
+ public long getTimeout() {
+ return 0;
+ }
- this.readTimestamp = readTimestamp;
- this.txFuture = txFuture;
+ @Override
+ public boolean isRolledBackWithTimeoutExceeded() {
+ return false;
+ }
+
+ @Override
+ public void processDelayedAck(Object val, @Nullable Throwable err) {
+ // No-op.
}
@Override
@@ -77,12 +99,12 @@ public class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
@Override
public HybridTimestamp readTimestamp() {
- return readTimestamp;
+ return null;
}
@Override
public HybridTimestamp schemaTimestamp() {
- return readTimestamp;
+ return createTs;
}
@Override
@@ -112,26 +134,17 @@ public class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
@Override
public CompletableFuture<Void> commitAsync() {
- return TransactionsExceptionMapperUtil.convertToPublicFuture(
- finish(true, readTimestamp, false, false),
- TX_COMMIT_ERR
- );
+ return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> rollbackAsync() {
- return TransactionsExceptionMapperUtil.convertToPublicFuture(
- finish(false, readTimestamp, false, false),
- TX_ROLLBACK_ERR
- );
+ return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> rollbackTimeoutExceededAsync() {
- return TransactionsExceptionMapperUtil.convertToPublicFuture(
- finish(false, readTimestamp, false, true),
- TX_ROLLBACK_ERR
- );
+ return nullCompletedFuture();
}
@Override
@@ -141,34 +154,28 @@ public class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
boolean full,
boolean timeoutExceeded
) {
- assert !full : "Read-only transactions cannot be full.";
- assert !(commitIntent && timeoutExceeded) : "Transaction cannot commit
with timeout exceeded.";
-
- if (!finishGuard.compareAndSet(false, true)) {
- return nullCompletedFuture();
- }
-
observableTsTracker.update(executionTimestamp);
- txFuture.complete(null);
-
- ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(
- commitIntent,
- new TxIdAndTimestamp(readTimestamp, id()),
- timeoutExceeded);
-
- this.timeoutExceeded = timeoutExceeded;
-
- return txFuture;
+ return nullCompletedFuture();
}
@Override
public boolean isFinishingOrFinished() {
- return finishGuard.get();
+ return false;
}
@Override
public CompletableFuture<Void> kill() {
- return finish(false, readTimestamp, false, false);
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public void commit() throws TransactionException {
+ // No-op.
+ }
+
+ @Override
+ public void rollback() throws TransactionException {
+ // No-op.
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index cd3705215a5..44e2c014a78 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -50,7 +50,6 @@ public class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
* @param observableTsTracker Observable timestamp tracker.
* @param id The id.
* @param txCoordinatorId Transaction coordinator inconsistent ID.
- * @param implicit True for an implicit transaction, false for an ordinary
one.
* @param timeout The timeout.
* @param readTimestamp The read timestamp.
*/
@@ -59,12 +58,11 @@ public class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
HybridTimestampTracker observableTsTracker,
UUID id,
UUID txCoordinatorId,
- boolean implicit,
long timeout,
HybridTimestamp readTimestamp,
CompletableFuture<Void> txFuture
) {
- super(txManager, observableTsTracker, id, txCoordinatorId, implicit,
timeout);
+ super(txManager, observableTsTracker, id, txCoordinatorId, false,
timeout);
this.readTimestamp = readTimestamp;
this.txFuture = txFuture;
@@ -152,7 +150,7 @@ public class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
txFuture.complete(null);
- ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(
+ ((TxManagerImpl) txManager).onCompleteReadOnlyTransaction(
commitIntent,
new TxIdAndTimestamp(readTimestamp, id()),
timeoutExceeded);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 1b9ef100004..20b77239fa3 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -302,4 +302,14 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
public CompletableFuture<Void> kill() {
return finishInternal(false, null, false, false, false);
}
+
+ /**
+ * Fail the transaction with exception so finishing it is not possible.
+ *
+ * @param e Fail reason.
+ */
+ public void fail(TransactionException e) {
+ // Thread safety is not needed.
+ finishFuture = failedFuture(e);
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 40a60feaecd..91ad9d5dc63 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -37,17 +37,17 @@ import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -119,8 +119,9 @@ import
org.apache.ignite.internal.tx.views.LocksViewProvider;
import org.apache.ignite.internal.tx.views.TransactionsViewProvider;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -184,9 +185,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
/** Prevents double stopping of the tracker. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
- /** Busy lock to stop synchronously. */
- private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
-
/** Detector of transactions that lost the coordinator. */
private final OrphanDetector orphanDetector;
@@ -241,6 +239,10 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
private final TransactionMetricsSource txMetrics;
+ private volatile boolean isStopping;
+
+ private final ConcurrentLinkedQueue<CompletableFuture<?>> stopFuts = new
ConcurrentLinkedQueue<>();
+
/**
* Test-only constructor.
*
@@ -404,18 +406,16 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
PrimaryReplicaEventParameters eventParameters,
Consumer<ReplicationGroupId> action
) {
- return inBusyLock(busyLock, () -> {
- assertReplicationGroupType(eventParameters.groupId());
+ assertReplicationGroupType(eventParameters.groupId());
- // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 -
remove check for TablePartitionId.
- if (!(eventParameters.groupId() instanceof TablePartitionId) &&
!(eventParameters.groupId() instanceof ZonePartitionId)) {
- return falseCompletedFuture();
- }
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 - remove
check for TablePartitionId.
+ if (!(eventParameters.groupId() instanceof TablePartitionId) &&
!(eventParameters.groupId() instanceof ZonePartitionId)) {
+ return falseCompletedFuture();
+ }
- action.accept(eventParameters.groupId());
+ action.accept(eventParameters.groupId());
- return falseCompletedFuture();
- });
+ return falseCompletedFuture();
}
private CompletableFuture<Boolean>
primaryReplicaElectedListener(PrimaryReplicaEventParameters eventParameters) {
@@ -434,41 +434,32 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
@Override
public InternalTransaction beginImplicit(HybridTimestampTracker
timestampTracker, boolean readOnly) {
- return begin(timestampTracker, true, readOnly,
InternalTxOptions.defaults());
- }
+ if (readOnly) {
+ return new ReadOnlyImplicitTransactionImpl(timestampTracker,
clockService.current());
+ }
- @Override
- public InternalTransaction beginExplicit(HybridTimestampTracker
timestampTracker, boolean readOnly, InternalTxOptions txOptions) {
- return begin(timestampTracker, false, readOnly, txOptions);
- }
+ HybridTimestamp beginTimestamp =
createBeginTimestampWithIncrementRwTxCounter();
+ var tx = beginReadWriteTransaction(timestampTracker, beginTimestamp,
true, InternalTxOptions.defaults());
- private InternalTransaction begin(
- HybridTimestampTracker timestampTracker,
- boolean implicit,
- boolean readOnly,
- InternalTxOptions options
- ) {
- return inBusyLock(busyLock, () -> beginBusy(timestampTracker,
implicit, readOnly, options));
+ txStateVolatileStorage.initialize(tx);
+ txMetrics.onTransactionStarted();
+
+ return tx;
}
- private InternalTransaction beginBusy(
- HybridTimestampTracker timestampTracker,
- boolean implicit,
- boolean readOnly,
- InternalTxOptions options
- ) {
+ @Override
+ public InternalTransaction beginExplicit(HybridTimestampTracker
timestampTracker, boolean readOnly, InternalTxOptions txOptions) {
InternalTransaction tx;
if (readOnly) {
- HybridTimestamp beginTimestamp = clockService.now();
- tx = beginReadOnlyTransaction(timestampTracker, beginTimestamp,
implicit, options);
+ HybridTimestamp beginTimestamp = clockService.now(); // Tick to
generate new unique id.
+ tx = beginReadOnlyTransaction(timestampTracker, beginTimestamp,
txOptions);
} else {
HybridTimestamp beginTimestamp =
createBeginTimestampWithIncrementRwTxCounter();
- tx = beginReadWriteTransaction(timestampTracker, beginTimestamp,
implicit, options);
+ tx = beginReadWriteTransaction(timestampTracker, beginTimestamp,
false, txOptions);
}
txStateVolatileStorage.initialize(tx);
-
txMetrics.onTransactionStarted();
return tx;
@@ -497,10 +488,13 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
// Implicit transactions are finished as soon as their operation/query
is finished, they cannot be abandoned, so there is
// no need to register them.
// TODO: https://issues.apache.org/jira/browse/IGNITE-24229 - schedule
expiration for multi-key implicit transactions?
- boolean scheduleExpiration = !implicit;
-
- if (scheduleExpiration) {
+ if (!implicit) {
transactionExpirationRegistry.register(transaction);
+
+ if (isStopping) {
+ transaction.fail(new
TransactionException(Common.NODE_STOPPING_ERR,
+ "Failed to finish the transaction because a node is
stopping: [txId=" + txId + ']'));
+ }
}
return transaction;
@@ -513,7 +507,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
private ReadOnlyTransactionImpl beginReadOnlyTransaction(
HybridTimestampTracker timestampTracker,
HybridTimestamp beginTimestamp,
- boolean implicit,
InternalTxOptions options
) {
UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp,
options.priority());
@@ -532,7 +525,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
if (!lockAcquired) {
throw new IgniteInternalException(
TX_READ_ONLY_TOO_OLD_ERR,
- "Timestamp of read-only transaction must be greater than
the low watermark: [txTimestamp={}, lowWatermark={}]",
+ "Attempted to read data below the garbage collection
watermark: [readTimestamp={}, gcTimestamp={}]",
readTimestamp,
lowWatermark.getLowWatermark());
}
@@ -543,25 +536,15 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
long timeout = getTimeoutOrDefault(options,
txConfig.readOnlyTimeoutMillis().value());
var transaction = new ReadOnlyTransactionImpl(
- this, timestampTracker, txId, localNodeId, implicit,
timeout, readTimestamp, txFuture
+ this, timestampTracker, txId, localNodeId, timeout,
readTimestamp, txFuture
);
- // Implicit transactions are finished as soon as their
operation/query is finished, they cannot be abandoned, so there is
- // no need to register them.
- // TODO: https://issues.apache.org/jira/browse/IGNITE-24229 -
schedule expiration for multi-key implicit transactions?
- boolean scheduleExpiration = !implicit;
-
- if (scheduleExpiration) {
- transactionExpirationRegistry.register(transaction);
- }
+ transactionExpirationRegistry.register(transaction);
txFuture.whenComplete((unused, throwable) -> {
lowWatermark.unlock(txId);
- // We only register explicit transactions, so we only
unregister them as well.
- if (scheduleExpiration) {
- transactionExpirationRegistry.unregister(transaction);
- }
+ transactionExpirationRegistry.unregister(transaction);
});
return transaction;
@@ -578,9 +561,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
* @return Current read timestamp.
*/
private HybridTimestamp currentReadTimestamp(HybridTimestamp beginTx) {
- return beginTx.subtractPhysicalTime(
- idleSafeTimePropagationPeriodMsSupplier.getAsLong() +
clockService.maxClockSkewMillis()
- );
+ return
beginTx.subtractPhysicalTime(idleSafeTimePropagationPeriodMsSupplier.getAsLong()
+ clockService.maxClockSkewMillis());
}
@Override
@@ -780,6 +761,19 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
.thenCompose(r -> verificationFuture);
}
+ private <T> CompletableFuture<T> trackFuture(CompletableFuture<T> fut) {
+ if (fut.isDone()) {
+ return fut;
+ }
+
+ if (isStopping) {
+ // Track finish future.
+ stopFuts.add(fut);
+ }
+
+ return fut;
+ }
+
/**
* Durable finish request.
*/
@@ -792,7 +786,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
HybridTimestamp commitTimestamp,
CompletableFuture<TransactionMeta> txFinishFuture
) {
- return inBusyLockAsync(busyLock, () ->
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartition)
+ return
trackFuture(placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartition)
.thenCompose(meta ->
sendFinishRequest(
observableTimestampTracker,
@@ -993,49 +987,47 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
- return inBusyLockAsync(busyLock, () -> {
- var deadlockPreventionPolicy = new
DeadlockPreventionPolicyImpl(DEFAULT_TX_ID_COMPARATOR, DEFAULT_LOCK_TIMEOUT);
+ var deadlockPreventionPolicy = new
DeadlockPreventionPolicyImpl(DEFAULT_TX_ID_COMPARATOR, DEFAULT_LOCK_TIMEOUT);
- // TODO https://issues.apache.org/jira/browse/IGNITE-23539
- lockManager.start(deadlockPreventionPolicy);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-23539
+ lockManager.start(deadlockPreventionPolicy);
- localNodeId = topologyService.localMember().id();
+ localNodeId = topologyService.localMember().id();
- messagingService.addMessageHandler(ReplicaMessageGroup.class,
this);
+ messagingService.addMessageHandler(ReplicaMessageGroup.class, this);
- persistentTxStateVacuumizer = new PersistentTxStateVacuumizer(
- replicaService,
- topologyService.localMember(),
- clockService,
- placementDriver,
- failureProcessor
- );
+ persistentTxStateVacuumizer = new PersistentTxStateVacuumizer(
+ replicaService,
+ topologyService.localMember(),
+ clockService,
+ placementDriver,
+ failureProcessor
+ );
- txStateVolatileStorage.start();
+ txStateVolatileStorage.start();
- txViewProvider.init(localNodeId,
txStateVolatileStorage.statesMap());
+ txViewProvider.init(localNodeId, txStateVolatileStorage.statesMap());
- orphanDetector.start(txStateVolatileStorage,
- () -> longProperty(systemCfg, ABANDONED_CHECK_TS_PROP,
ABANDONED_CHECK_TS_PROP_DEFAULT_VALUE));
+ orphanDetector.start(txStateVolatileStorage,
+ () -> longProperty(systemCfg, ABANDONED_CHECK_TS_PROP,
ABANDONED_CHECK_TS_PROP_DEFAULT_VALUE));
- txCleanupRequestSender.start();
+ txCleanupRequestSender.start();
- txCleanupRequestHandler.start();
+ txCleanupRequestHandler.start();
-
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
primaryReplicaExpiredListener);
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
primaryReplicaExpiredListener);
-
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
primaryReplicaElectedListener);
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
primaryReplicaElectedListener);
- transactionExpirationJobFuture =
commonScheduler.scheduleAtFixedRate(this::expireTransactionsUpToNow,
- EXPIRE_FREQ_MILLIS, EXPIRE_FREQ_MILLIS, MILLISECONDS);
+ transactionExpirationJobFuture =
commonScheduler.scheduleAtFixedRate(this::expireTransactionsUpToNow,
+ EXPIRE_FREQ_MILLIS, EXPIRE_FREQ_MILLIS, MILLISECONDS);
- lockRetryCount = toIntExact(longProperty(systemCfg,
LOCK_RETRY_COUNT_PROP, LOCK_RETRY_COUNT_PROP_DEFAULT_VALUE));
+ lockRetryCount = toIntExact(longProperty(systemCfg,
LOCK_RETRY_COUNT_PROP, LOCK_RETRY_COUNT_PROP_DEFAULT_VALUE));
- metricsManager.registerSource(txMetrics);
- metricsManager.enable(txMetrics);
+ metricsManager.registerSource(txMetrics);
+ metricsManager.enable(txMetrics);
- return nullCompletedFuture();
- });
+ return nullCompletedFuture();
}
private void expireTransactionsUpToNow() {
@@ -1051,6 +1043,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
@Override
public void beforeNodeStop() {
+ isStopping = true;
orphanDetector.stop();
}
@@ -1060,26 +1053,39 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
return nullCompletedFuture();
}
- busyLock.block();
+ // Wait all pending finish futures.
+ List<CompletableFuture<?>> toWait = new ArrayList<>();
+
+ for (CompletableFuture<?> stopFut : stopFuts) {
+ if (!stopFut.isDone()) {
+ toWait.add(stopFut);
+ }
+ }
+
+ stopFuts.clear();
- txStateVolatileStorage.stop();
+ LOG.debug("Waiting for tx finish futures on shutdown [cnt=" +
toWait.size() + ']');
- txCleanupRequestHandler.stop();
+ return CompletableFutures.allOf(toWait).handle((r, e) -> {
+ txStateVolatileStorage.stop();
-
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
primaryReplicaExpiredListener);
+ txCleanupRequestHandler.stop();
-
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
primaryReplicaElectedListener);
+
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
primaryReplicaExpiredListener);
- ScheduledFuture<?> expirationJobFuture =
transactionExpirationJobFuture;
- if (expirationJobFuture != null) {
- expirationJobFuture.cancel(false);
- }
+
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
primaryReplicaElectedListener);
+
+ ScheduledFuture<?> expirationJobFuture =
transactionExpirationJobFuture;
+ if (expirationJobFuture != null) {
+ expirationJobFuture.cancel(false);
+ }
- transactionExpirationRegistry.abortAllRegistered();
+ transactionExpirationRegistry.abortAllRegistered();
- shutdownAndAwaitTermination(writeIntentSwitchPool, 10,
TimeUnit.SECONDS);
+ shutdownAndAwaitTermination(writeIntentSwitchPool, 10,
TimeUnit.SECONDS);
- return nullCompletedFuture();
+ return null;
+ });
}
@Override
@@ -1163,7 +1169,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
return runAsync(runnable, writeIntentSwitchPool);
}
- void completeReadOnlyTransactionFuture(boolean commitIntent,
TxIdAndTimestamp txIdAndTimestamp, boolean timeoutExceeded) {
+ void onCompleteReadOnlyTransaction(boolean commitIntent, TxIdAndTimestamp
txIdAndTimestamp, boolean timeoutExceeded) {
UUID txId = txIdAndTimestamp.getTxId();
txMetrics.onReadOnlyTransactionFinished(txId, commitIntent);
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 44bdbe3a6a8..48d339d57bc 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -244,12 +244,26 @@ public class TxManagerTest extends IgniteAbstractTest {
ReplicationGroupId partitionIdForEnlistment =
targetReplicationGroupId(1, 0);
+ tx.assignCommitPartition(partitionIdForEnlistment);
tx.enlist(partitionIdForEnlistment, 10, REMOTE_NODE.name(), 1L);
PendingTxPartitionEnlistment actual =
tx.enlistedPartition(partitionIdForEnlistment);
assertEquals(REMOTE_NODE.name(), actual.primaryNodeConsistentId());
assertEquals(1L, actual.consistencyToken());
assertEquals(Set.of(10), actual.tableIds());
+
+ // Avoid NPE on finish.
+ var meta = mock(ReplicaMeta.class);
+ when(meta.getStartTime()).thenReturn(clock.current());
+ when(meta.getExpirationTime()).thenReturn(clock.current());
+ when(meta.getLeaseholder()).thenReturn("test");
+ when(meta.getLeaseholderId()).thenReturn(randomUUID());
+
+ when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(),
any())).thenReturn(completedFuture(meta));
+
+ HybridTimestamp commitTimestamp = clockService.now();
+ when(replicaService.invoke(anyString(),
any(TxFinishReplicaRequest.class)))
+ .thenReturn(completedFuture(new
TransactionResult(TxState.COMMITTED, commitTimestamp)));
}
// TODO: IGNITE-22522 - inline this after switching to ZonePartitionId.
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index bda33912133..4ce5d3a18a6 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -47,7 +47,6 @@ class ReadOnlyTransactionImplTest extends
BaseIgniteAbstractTest {
HybridTimestampTracker.atomicTracker(null),
txId,
new UUID(1, 2),
- false,
10_000,
readTimestamp,
new CompletableFuture<>()