This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2479484c6e7 IGNITE-25844 Add transactions metric source (#6206)
2479484c6e7 is described below
commit 2479484c6e79cd47a874cc47db5ef891ce67ea09
Author: Slava Koptilin <[email protected]>
AuthorDate: Wed Jul 16 10:06:22 2025 +0300
IGNITE-25844 Add transactions metric source (#6206)
---
.../ignite/internal/cli/CliIntegrationTest.java | 3 +-
.../apache/ignite/client/fakes/FakeTxManager.java | 2 +-
.../rebalance/ItRebalanceDistributedTest.java | 3 +-
.../internal/metrics/AbstractMetricSource.java | 10 +
.../partition/replicator/fixtures/Node.java | 3 +-
...xStateCommitPartitionReplicaRequestHandler.java | 2 +-
.../rest/metrics/ItMetricControllerTest.java | 3 +-
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../apache/ignite/internal/table/NodeUtils.java | 25 +-
.../exec/rel/TableScanNodeExecutionTest.java | 4 +-
modules/table/build.gradle | 1 +
.../apache/ignite/distributed/ItLockTableTest.java | 4 +-
...xDistributedTestSingleNodeNoCleanupMessage.java | 4 +-
.../ignite/internal/table/ItColocationTest.java | 4 +-
.../ignite/internal/table/AbstractTableView.java | 7 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 7 +-
.../table/impl/DummyInternalTableImpl.java | 4 +-
modules/transactions/build.gradle | 1 +
.../internal/tx/ItTransactionMetricsTest.java | 418 +++++++++++++++++++++
.../internal/tx/PendingTxPartitionEnlistment.java | 8 +-
.../org/apache/ignite/internal/tx/TxManager.java | 8 +-
.../org/apache/ignite/internal/tx/TxStateMeta.java | 5 +-
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 14 +-
.../internal/tx/impl/ReadWriteTransactionImpl.java | 5 +-
.../internal/tx/impl/TransactionInflights.java | 15 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 74 ++--
.../tx/metrics/TransactionMetricsSource.java | 211 +++++++++++
.../tx/views/TransactionsViewProvider.java | 10 +-
.../internal/tx/TransactionMetricSourceTest.java | 72 ++++
.../apache/ignite/internal/tx/TxManagerTest.java | 4 +-
31 files changed, 866 insertions(+), 71 deletions(-)
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index 1682352b736..69a19ee0989 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -75,7 +75,8 @@ public abstract class CliIntegrationTest extends
ClusterPerClassIntegrationTest
new MetricSource().name("topology.local").enabled(true),
new
MetricSource().name("thread.pools.partitions-executor").enabled(true),
new MetricSource().name("thread.pools.sql-executor").enabled(true),
- new
MetricSource().name("thread.pools.sql-planning-executor").enabled(true)
+ new
MetricSource().name("thread.pools.sql-planning-executor").enabled(true),
+ new MetricSource().name("transactions").enabled(true)
};
/** Correct ignite jdbc url. */
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 4d717d1e533..0038020725f 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -219,7 +219,7 @@ public class FakeTxManager implements TxManager {
public CompletableFuture<Void> finish(
HybridTimestampTracker timestampTracker,
ReplicationGroupId commitPartition,
- boolean commit,
+ boolean commitIntent,
boolean timeoutExceeded,
Map<ReplicationGroupId, PendingTxPartitionEnlistment>
enlistedGroups,
UUID txId
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 2ceb1b26fb7..a065a67a8c1 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1515,7 +1515,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
resourcesRegistry,
transactionInflights,
lowWatermark,
- commonScheduledExecutorService
+ commonScheduledExecutorService,
+ metricManager
);
rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
diff --git
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
index c686245381d..901d39a8635 100644
---
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
+++
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
@@ -54,6 +54,16 @@ public abstract class AbstractMetricSource<T extends
AbstractMetricSource.Holder
this(name, null, null);
}
+ /**
+ * Base constructor for all metric source implementations.
+ *
+ * @param name Metric source name.
+ * @param description Description.
+ */
+ protected AbstractMetricSource(String name, String description) {
+ this(name, description, null);
+ }
+
/**
* Base constructor for all metric source implementations.
*
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index d162150c090..9d561dc474a 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -627,7 +627,8 @@ public class Node {
resourcesRegistry,
transactionInflights,
lowWatermark,
- threadPoolsManager.commonScheduler()
+ threadPoolsManager.commonScheduler(),
+ metricManager
);
volatileLogStorageFactoryCreator = new
VolatileLogStorageFactoryCreator(name, workDir.resolve("volatile-log-spillout-"
+ name));
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
index f4178d49164..9408333f0b0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
@@ -113,7 +113,7 @@ public class TxStateCommitPartitionReplicaRequestHandler {
||
clusterNodeResolver.getById(txStateMeta.txCoordinatorId()) == null) {
// This means that primary replica for commit partition has
changed, since the local node doesn't have the volatile tx
// state; and there is no final tx state in txStateStorage, or
the tx coordinator left the cluster. But we can assume
- // that as the coordinator (or information about it) is
missing, there is no need to wait a finish request from
+ // that as the coordinator (or information about it) is
missing, there is no need to wait a finish request from
// tx coordinator, the transaction can't be committed at all.
return txRecoveryEngine.triggerTxRecovery(txId, localNode.id())
.handle((v, ex) ->
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index 1869bc1df7a..df9a334484b 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -62,7 +62,8 @@ class ItMetricControllerTest extends
ClusterPerClassIntegrationTest {
new MetricSource("topology.local", true),
new MetricSource("thread.pools.partitions-executor", true),
new MetricSource("thread.pools.sql-executor", true),
- new MetricSource("thread.pools.sql-planning-executor", true)
+ new MetricSource("thread.pools.sql-planning-executor", true),
+ new MetricSource("transactions", true)
};
@Inject
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 1df60705030..421793a1e89 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -648,7 +648,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
lowWatermark,
threadPoolsManager.commonScheduler(),
failureProcessor,
- nodeProperties
+ nodeProperties,
+ metricManager
);
ResourceVacuumManager resourceVacuumManager = new
ResourceVacuumManager(
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1bf76d2701a..f383fb7c249 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1045,7 +1045,8 @@ public class IgniteImpl implements Ignite {
lowWatermark,
threadPoolsManager.commonScheduler(),
failureManager,
- nodeProperties
+ nodeProperties,
+ metricManager
);
sharedTxStateStorage = new TxStateRocksDbSharedStorage(
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
index 48def102693..779d88107f7 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
@@ -153,8 +153,20 @@ public class NodeUtils {
return preferablePrimary[0];
}
- private static void stopLeaseProlongation(Collection<IgniteImpl> nodes,
IgniteImpl leaseholderNode, ReplicationGroupId groupId,
- String preferablePrimary) {
+ /**
+ * Stops lease prolongation for the given replication group.
+ *
+ * @param nodes Cluster nodes.
+ * @param leaseholderNode Current Lease holder.
+ * @param groupId Replication group id.
+ * @param preferablePrimary Preferable primary.
+ */
+ public static void stopLeaseProlongation(
+ Collection<IgniteImpl> nodes,
+ IgniteImpl leaseholderNode,
+ ReplicationGroupId groupId,
+ @Nullable String preferablePrimary
+ ) {
StopLeaseProlongationMessage msg =
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
.groupId(groupId)
.redirectProposal(preferablePrimary)
@@ -165,7 +177,14 @@ public class NodeUtils {
);
}
- private static ReplicaMeta leaseholder(IgniteImpl node, ReplicationGroupId
groupId) {
+ /**
+ * Returns a replica meta information for the given replication group.
+ *
+ * @param node Ignite node to be used for getting a meta.
+ * @param groupId Replication group id.
+ * @return Replica meta.
+ */
+ public static ReplicaMeta leaseholder(IgniteImpl node, ReplicationGroupId
groupId) {
CompletableFuture<ReplicaMeta> leaseholderFuture =
node.placementDriver().awaitPrimaryReplica(
groupId,
node.clock().now(),
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index c11db985f60..a1a09307800 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
@@ -193,7 +194,8 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
resourcesRegistry,
transactionInflights,
new TestLowWatermark(),
- commonExecutor
+ commonExecutor,
+ new NoOpMetricManager()
);
assertThat(txManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index f7b5ee7c88c..8b244242c92 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -154,6 +154,7 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-replicator'))
integrationTestImplementation
testFixtures(project(':ignite-marshaller-common'))
integrationTestImplementation testFixtures(project(':ignite-catalog'))
+ integrationTestImplementation testFixtures(project(':ignite-metrics'))
integrationTestImplementation libs.fastutil.core
integrationTestImplementation libs.calcite.core
integrationTestImplementation libs.rocksdb.jni
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
index 6c7c9c2ae4a..38582fb3561 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.metrics.TestMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -161,7 +162,8 @@ public class ItLockTableTest extends IgniteAbstractTest {
resourcesRegistry,
transactionInflights,
lowWatermark,
- commonExecutor
+ commonExecutor,
+ new TestMetricManager()
);
}
};
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 2b251d34ce2..52b626e570e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.metrics.TestMetricManager;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.ClusterService;
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
@@ -148,7 +149,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
resourcesRegistry,
transactionInflights,
lowWatermark,
- commonExecutor
+ commonExecutor,
+ new TestMetricManager()
) {
@Override
public CompletableFuture<Void>
executeWriteIntentSwitchAsync(Runnable runnable) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index cbd6c643228..320a8b15c9d 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
@@ -221,7 +222,8 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
resourcesRegistry,
transactionInflights,
new TestLowWatermark(),
- commonExecutor
+ commonExecutor,
+ new NoOpMetricManager()
) {
@Override
public CompletableFuture<Void> finish(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 685b030fea8..916ba0259f5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -134,8 +134,11 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
return withSchemaSync((InternalTransaction) tx, null, action);
}
- private <T> CompletableFuture<T> withSchemaSync(@Nullable
InternalTransaction tx, @Nullable Integer previousSchemaVersion,
- KvAction<T> action) {
+ private <T> CompletableFuture<T> withSchemaSync(
+ @Nullable InternalTransaction tx,
+ @Nullable Integer previousSchemaVersion,
+ KvAction<T> action
+ ) {
CompletableFuture<Integer> schemaVersionFuture =
isDirectFlowApplicableTx(tx)
? schemaVersions.schemaVersionAtCurrentTime(tbl.tableId())
: schemaVersions.schemaVersionAt(tx.schemaTimestamp(),
tbl.tableId());
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 606a89b2888..6bf0be899f9 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.TestMetricManager;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NodeFinder;
@@ -614,7 +615,8 @@ public class ItTxTestCluster {
lowWatermark,
executor,
new NoOpFailureManager(),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new TestMetricManager()
);
}
@@ -1328,7 +1330,8 @@ public class ItTxTestCluster {
lowWatermark,
executor,
new NoOpFailureManager(),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new TestMetricManager()
);
clientResourceVacuumManager = new ResourceVacuumManager(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3da82c686e4..89154a6e117 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.AbstractMessagingService;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.ClusterNodeImpl;
@@ -695,7 +696,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
resourcesRegistry,
transactionInflights,
new TestLowWatermark(),
- COMMON_SCHEDULER
+ COMMON_SCHEDULER,
+ new NoOpMetricManager()
);
assertThat(txManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git a/modules/transactions/build.gradle
b/modules/transactions/build.gradle
index 2b7e830734a..76189930ad0 100644
--- a/modules/transactions/build.gradle
+++ b/modules/transactions/build.gradle
@@ -58,6 +58,7 @@ dependencies {
testImplementation testFixtures(project(':ignite-placement-driver-api'))
testImplementation testFixtures(project(':ignite-low-watermark'))
testImplementation testFixtures(project(':ignite-transactions'))
+ testImplementation testFixtures(project(':ignite-metrics'))
integrationTestImplementation project(':ignite-api')
integrationTestImplementation project(':ignite-cluster-management')
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
new file mode 100644
index 00000000000..0d615b20b6c
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilizationOnDefaultZone;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.table.NodeUtils;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests transaction metrics.
+ */
+public class ItTransactionMetricsTest extends ClusterPerClassIntegrationTest {
+ public static final String TABLE_NAME = "test_table_name";
+
+ @Override
+ protected int initialNodes() {
+ return 2;
+ }
+
+ @BeforeAll
+ void createTable() throws Exception {
+ sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val
VARCHAR)");
+
+ awaitAssignmentsStabilizationOnDefaultZone(CLUSTER.aliveNode());
+ }
+
+ /**
+ * Returns a key value view for the table {@link #TABLE_NAME}.
+ *
+ * @param nodeIndex Node index to create a key value view.
+ * @return Key value view.
+ */
+ private static KeyValueView<Integer, String> keyValueView(int nodeIndex) {
+ return keyValueView(CLUSTER.node(nodeIndex));
+ }
+
+ /**
+ * Returns a key value view for the table {@link #TABLE_NAME}.
+ *
+ * @param node Node to create a key value view.
+ * @return Key value view.
+ */
+ private static KeyValueView<Integer, String> keyValueView(Ignite node) {
+ return node.tables().table(TABLE_NAME).keyValueView(Integer.class,
String.class);
+ }
+
+ /**
+ * Returns a snapshot of transaction metrics for the given node.
+ *
+ * @param nodeIndex Node index to capture transaction metrics.
+ * @return Snapshot of transaction metrics.
+ */
+ private static Map<String, Long> metricValues(int nodeIndex) {
+ var values = new HashMap<String, Long>();
+
+ MetricSet txMetrics = unwrapIgniteImpl(node(nodeIndex))
+ .metricManager()
+ .metricSnapshot()
+ .metrics()
+ .get(TransactionMetricsSource.SOURCE_NAME);
+
+ txMetrics.iterator().forEachRemaining(metric -> {
+ if (metric instanceof LongMetric) {
+ values.put(metric.name(), ((LongMetric) metric).value());
+ }
+ });
+
+ return values;
+ }
+
+ private static void testMetricValues(Map<String, Long> initial,
Map<String, Long> actual, String... ignored) {
+ assertThat("Number of metrics should be the same.", initial.size(),
is(actual.size()));
+
+ var exclude = Set.of(ignored);
+
+ for (Map.Entry<String, Long> e : initial.entrySet()) {
+ if (!exclude.contains(e.getKey())) {
+ assertThat("Metric name = " + e.getKey(),
actual.get(e.getKey()), is(e.getValue()));
+ }
+ }
+ }
+
+ /**
+ * Tests that TotalCommits and RoCommits are incremented when a read only
transaction successfully committed.
+ *
+ * @param implicit {@code true} if a transaction should be implicit and
{@code false} otherwise.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testCommitReadOnlyTransaction(boolean implicit) {
+ Map<String, Long> metrics0 = metricValues(0);
+ Map<String, Long> metrics1 = metricValues(1);
+
+ Transaction tx = implicit ? null : node(0).transactions().begin(new
TransactionOptions().readOnly(true));
+ keyValueView(0).get(tx, 12);
+ if (!implicit) {
+ tx.commit();
+ }
+
+ Map<String, Long> actualMetrics0 = metricValues(0);
+ Map<String, Long> actualMetrics1 = metricValues(1);
+
+ // Check that there are no updates on the node 1.
+ testMetricValues(metrics1, actualMetrics1);
+
+ // Check that all transaction metrics ere not changed except
TotalCommits and RoCommits.
+ testMetricValues(metrics0, actualMetrics0, "TotalCommits",
"RoCommits");
+
+ assertThat(actualMetrics0.get("TotalCommits"),
is(metrics0.get("TotalCommits") + 1));
+ assertThat(actualMetrics0.get("RoCommits"),
is(metrics0.get("RoCommits") + 1));
+ }
+
+ /**
+ * Tests that TotalCommits and RwCommits are incremented when a read write
transaction successfully committed.
+ *
+ * @param implicit {@code true} if a transaction should be implicit and
{@code false} otherwise.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testCommitReadWriteTransaction(boolean implicit) {
+ Map<String, Long> metrics0 = metricValues(0);
+ Map<String, Long> metrics1 = metricValues(1);
+
+ Transaction tx = implicit ? null : node(0).transactions().begin();
+ keyValueView(0).put(tx, 12, "value");
+ if (!implicit) {
+ tx.commit();
+ }
+
+ Map<String, Long> actualMetrics0 = metricValues(0);
+ Map<String, Long> actualMetrics1 = metricValues(1);
+
+ // Check that there are no updates on the node 1.
+ testMetricValues(metrics1, actualMetrics1);
+
+ // Check that all transaction metrics ere not changed except
TotalCommits and RwCommits.
+ testMetricValues(metrics0, actualMetrics0, "TotalCommits",
"RwCommits");
+
+ assertThat(actualMetrics0.get("TotalCommits"),
is(metrics0.get("TotalCommits") + 1));
+ assertThat(actualMetrics0.get("RwCommits"),
is(metrics0.get("RwCommits") + 1));
+ }
+
+ /**
+ * Tests that TotalCommits and RwCommits/RoCommits are incremented when an
"empty" transaction successfully committed.
+ * Empty means that there are no entries enlisted into the transaction.
+ */
+ @Test
+ void testCommitEmptyTransaction() {
+ Map<String, Long> metrics0 = metricValues(0);
+ Map<String, Long> metrics1 = metricValues(1);
+
+ Transaction rwTx = node(0).transactions().begin();
+ rwTx.commit();
+
+ Transaction roTx = node(0).transactions().begin(new
TransactionOptions().readOnly(true));
+ roTx.commit();
+
+ Map<String, Long> actualMetrics0 = metricValues(0);
+ Map<String, Long> actualMetrics1 = metricValues(1);
+
+ // Check that there are no updates on the node 1.
+ testMetricValues(metrics1, actualMetrics1);
+
+ // Check that all transaction metrics ere not changed except
TotalCommits, RwCommits and RoCommits.
+ testMetricValues(metrics0, actualMetrics0, "TotalCommits",
"RwCommits", "RoCommits");
+
+ assertThat(actualMetrics0.get("TotalCommits"),
is(metrics0.get("TotalCommits") + 2));
+ assertThat(actualMetrics0.get("RwCommits"),
is(metrics0.get("RwCommits") + 1));
+ assertThat(actualMetrics0.get("RoCommits"),
is(metrics0.get("RoCommits") + 1));
+ }
+
+ /**
+ * Tests that TotalRollbacks and RwRollbacks/RoRollbacks are incremented
when a transaction rolled back.
+ */
+ @Test
+ void testRollbackTransaction() {
+ Map<String, Long> metrics0 = metricValues(0);
+ Map<String, Long> metrics1 = metricValues(1);
+
+ Transaction rwTx = node(0).transactions().begin();
+ keyValueView(0).put(rwTx, 12, "value");
+ rwTx.rollback();
+
+ Transaction roTx = node(0).transactions().begin(new
TransactionOptions().readOnly(true));
+ keyValueView(0).get(roTx, 12);
+ roTx.rollback();
+
+ Map<String, Long> actualMetrics0 = metricValues(0);
+ Map<String, Long> actualMetrics1 = metricValues(1);
+
+ // Check that there are no updates on the node 1.
+ testMetricValues(metrics1, actualMetrics1);
+
+ // Check that all transaction metrics ere not changed except
TotalRollbacks, RwRollbacks and RoRollbacks.
+ testMetricValues(metrics0, actualMetrics0, "TotalRollbacks",
"RwRollbacks", "RoRollbacks");
+
+ assertThat(actualMetrics0.get("TotalRollbacks"),
is(metrics0.get("TotalRollbacks") + 2));
+ assertThat(actualMetrics0.get("RwRollbacks"),
is(metrics0.get("RwRollbacks") + 1));
+ assertThat(actualMetrics0.get("RoRollbacks"),
is(metrics0.get("RoRollbacks") + 1));
+ }
+
+ /**
+ * Tests that TotalRollbacks and RwRollbacks/RoRollbacks are incremented
when a transaction rolled back due to timeout.
+ */
+ @Test
+ void testTimeoutRollbackTransaction() throws Exception {
+ Map<String, Long> metrics0 = metricValues(0);
+ Map<String, Long> metrics1 = metricValues(1);
+
+ Transaction rwTx = node(0).transactions().begin(new
TransactionOptions().timeoutMillis(1000));
+ keyValueView(0).put(rwTx, 12, "value");
+
+ Transaction roTx = node(0).transactions().begin(new
TransactionOptions().readOnly(true).timeoutMillis(1000));
+ keyValueView(0).get(roTx, 12);
+
+ // wait for completion of the transactions due to timeout.
+ assertThat(waitForCondition(() -> {
+ Map<String, Long> m = metricValues(0);
+
+ boolean total = m.get("TotalRollbacks") ==
metrics0.get("TotalRollbacks") + 2;
+ boolean rw = m.get("RwRollbacks") == metrics0.get("RwRollbacks") +
1;
+ boolean ro = m.get("RoRollbacks") == metrics0.get("RoRollbacks") +
1;
+
+ return total && rw && ro;
+ }, 5_000), is(true));
+
+ // Check that there are no updates on the node 1.
+ testMetricValues(metrics1, metricValues(1));
+ }
+
+ /**
+ * Tests that TotalRollbacks and RwRollbacks are incremented when a
transaction rolled back due to a deadlock.
+ */
+ @Test
+ void testDeadlockTransaction() throws Exception {
+ Map<String, Long> metrics0 = metricValues(0);
+ Map<String, Long> metrics1 = metricValues(1);
+
+ KeyValueView<Integer, String> kv = keyValueView(0);
+
+ Transaction rwTx1 = node(0).transactions().begin();
+ Transaction rwTx2 = node(0).transactions().begin();
+
+ kv.put(rwTx1, 12, "value");
+ kv.put(rwTx2, 24, "value");
+
+ CompletableFuture<?> asyncOp1 = kv.getAsync(rwTx1, 24);
+ CompletableFuture<?> asyncOp2 = kv.getAsync(rwTx2, 12);
+
+ assertThat(waitForCondition(() -> asyncOp1.isDone() &&
asyncOp2.isDone(), 5_000), is(true));
+
+ rwTx1.commit();
+ // rwTx2 should be rolled back due to a deadlock
+
+ Map<String, Long> actualMetrics0 = metricValues(0);
+ Map<String, Long> actualMetrics1 = metricValues(1);
+
+ // Check that there are no updates on the node 1.
+ testMetricValues(metrics1, actualMetrics1);
+
+ // Check that all transaction metrics ere not changed except
TotalRollbacks, TotalCommits, RwRollbacks and RwCommits.
+ testMetricValues(metrics0, actualMetrics0, "TotalRollbacks",
"TotalCommits", "RwRollbacks", "RwCommits");
+
+ assertThat(actualMetrics0.get("TotalRollbacks"),
is(metrics0.get("TotalRollbacks") + 1));
+ assertThat(actualMetrics0.get("RwRollbacks"),
is(metrics0.get("RwRollbacks") + 1));
+
+ assertThat(actualMetrics0.get("TotalCommits"),
is(metrics0.get("TotalCommits") + 1));
+ assertThat(actualMetrics0.get("RwCommits"),
is(metrics0.get("RwCommits") + 1));
+ }
+
+ /**
+ * Tests that TotalRollbacks and RwRollbacks are incremented when a
transaction rolled back due to a lease expiration.
+ */
+ @Test
+ void testRollbackTransactionOnLeaseExpiration() {
+ Map<String, Long> metrics0 = metricValues(0);
+ Map<String, Long> metrics1 = metricValues(1);
+
+ int key = 12;
+
+ TableImpl table = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
+
+ int partitionId = table.partitionId(Tuple.create().set("id", key));
+
+ Transaction tx = node(0).transactions().begin();
+
+ keyValueView(0).put(tx, key, "value");
+
+ ReplicationGroupId replicationGroupId = colocationEnabled()
+ ? new ZonePartitionId(table.zoneId(), partitionId)
+ : new TablePartitionId(table.tableId(), partitionId);
+
+ ReplicaMeta leaseholder =
NodeUtils.leaseholder(unwrapIgniteImpl(node(0)), replicationGroupId);
+
+ IgniteImpl leaseholderNode = CLUSTER
+ .runningNodes()
+ .filter(n ->
n.cluster().localNode().id().equals(leaseholder.getLeaseholderId()))
+ .findFirst()
+ .map(TestWrappers::unwrapIgniteImpl)
+ .orElseThrow();
+
+ NodeUtils.stopLeaseProlongation(
+
CLUSTER.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(toSet()),
+ leaseholderNode,
+ new ZonePartitionId(table.zoneId(), partitionId),
+ null
+ );
+
+ // Wait for the lease expiration.
+ unwrapIgniteImpl(node(0))
+ .clockService()
+ .waitFor(leaseholder.getExpirationTime().tick())
+ .orTimeout(10, TimeUnit.SECONDS)
+ .join();
+
+ assertThrows(TransactionException.class, tx::commit, null);
+
+ Map<String, Long> actualMetrics0 = metricValues(0);
+ Map<String, Long> actualMetrics1 = metricValues(1);
+
+ // Check that there are no updates on the node 1.
+ testMetricValues(metrics1, actualMetrics1);
+
+ // Check that all transaction metrics ere not changed except
TotalCommits and RoCommits.
+ testMetricValues(metrics0, actualMetrics0, "TotalRollbacks",
"RwRollbacks");
+
+ assertThat(actualMetrics0.get("TotalRollbacks"),
is(metrics0.get("TotalRollbacks") + 1));
+ assertThat(actualMetrics0.get("RwRollbacks"),
is(metrics0.get("RwRollbacks") + 1));
+ }
+
+ /**
+ * Tests that TotalCommits, RwCommits and RoCommits are incremented when a
SQL engine is used.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testSqlTransaction(boolean implicit) {
+ Object[] emptyArgs = new Object[0];
+
+ Map<String, Long> metrics0 = metricValues(0);
+ Map<String, Long> metrics1 = metricValues(1);
+
+ // read-only transaction.
+ Transaction tx = implicit ? null : node(0).transactions().begin(new
TransactionOptions().readOnly(true));
+ sql(0, tx, "select * from " + TABLE_NAME, emptyArgs);
+ if (!implicit) {
+ tx.commit();
+ }
+
+ // read-write transaction.
+ tx = implicit ? null : node(0).transactions().begin();
+ sql(0, tx, "delete from " + TABLE_NAME, emptyArgs);
+ if (!implicit) {
+ tx.commit();
+ }
+
+ Map<String, Long> actualMetrics0 = metricValues(0);
+ Map<String, Long> actualMetrics1 = metricValues(1);
+
+ // Check that there are no updates on the node 1.
+ testMetricValues(metrics1, actualMetrics1);
+
+ // Check that all transaction metrics ere not changed except
TotalCommits and RoCommits.
+ testMetricValues(metrics0, actualMetrics0, "TotalCommits",
"RwCommits", "RoCommits");
+
+ assertThat(actualMetrics0.get("TotalCommits"),
is(metrics0.get("TotalCommits") + 2));
+ assertThat(actualMetrics0.get("RoCommits"),
is(metrics0.get("RoCommits") + 1));
+ assertThat(actualMetrics0.get("RwCommits"),
is(metrics0.get("RwCommits") + 1));
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/PendingTxPartitionEnlistment.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/PendingTxPartitionEnlistment.java
index 5dc9d524cdb..0df04d20bfc 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/PendingTxPartitionEnlistment.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/PendingTxPartitionEnlistment.java
@@ -18,9 +18,10 @@
package org.apache.ignite.internal.tx;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.tostring.S;
/**
- * Partition enlistement information in a pending transaction. It stores
information needed before commit timestamp is generated.
+ * Partition enlistment information in a pending transaction. It stores
information needed before commit timestamp is generated.
*/
public class PendingTxPartitionEnlistment extends PartitionEnlistment {
private final long consistencyToken;
@@ -65,4 +66,9 @@ public class PendingTxPartitionEnlistment extends
PartitionEnlistment {
public long consistencyToken() {
return consistencyToken;
}
+
+ @Override
+ public String toString() {
+ return S.toString(PendingTxPartitionEnlistment.class, this,
super.toString());
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index ec62dddcc97..a1f416eabe9 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -160,8 +160,8 @@ public interface TxManager extends IgniteComponent {
* updated with commit timestamp of every committed transaction.
Not null on commit.
* @param txId Transaction id.
* @param ts The timestamp which is associated to txn completion.
- * @param commit {@code True} if a commit requested.
- * @param timeoutExceeded {@code True} if a timeout exceeded. 'commit' and
timeout must not be {@code} True at the same time.
+ * @param commit {@code true} if a commit requested.
+ * @param timeoutExceeded {@code true} if a timeout exceeded. 'commit' and
timeout must not be {@code true} at the same time.
*/
void finishFull(
HybridTimestampTracker timestampTracker, UUID txId, @Nullable
HybridTimestamp ts, boolean commit, boolean timeoutExceeded
@@ -173,7 +173,7 @@ public interface TxManager extends IgniteComponent {
* @param timestampTracker Observable timestamp tracker is used to
determine the read timestamp for read-only transactions. Each client
* should pass its own tracker to provide linearizability between
read-write and read-only transactions started by this client.
* @param commitPartition Partition to store a transaction state. {@code
null} if nothing was enlisted into the transaction.
- * @param commit {@code true} if a commit requested.
+ * @param commitIntent {@code true} if a commit requested.
* @param timeoutExceeded {@code true} if a timeout exceeded.
* @param enlistedGroups Map of enlisted partitions.
* @param txId Transaction id.
@@ -181,7 +181,7 @@ public interface TxManager extends IgniteComponent {
CompletableFuture<Void> finish(
HybridTimestampTracker timestampTracker,
@Nullable ReplicationGroupId commitPartition,
- boolean commit,
+ boolean commitIntent,
boolean timeoutExceeded,
Map<ReplicationGroupId, PendingTxPartitionEnlistment>
enlistedGroups,
UUID txId
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index c48255d26c8..4a156acea15 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -54,8 +54,9 @@ public class TxStateMeta implements TransactionMeta {
private final @Nullable Boolean isFinishedDueToTimeout;
/**
- * The ignite transaction object is associated with this state. This field
can be initialized only on the transaction coordinator,
- * {@code null} in other nodes.
+ * The ignite transaction object is associated with this state.
+ * This field can be initialized only on the transaction coordinator,
{@code null} on the other nodes.
+ * Moreover, this field can be set to {@code null} even on the transaction
coordinator under certain circumstances.
*/
@IgniteToStringExclude
private final @Nullable InternalTransaction tx;
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 1005708e5ff..cd3705215a5 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -135,9 +135,14 @@ public class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
}
@Override
- public CompletableFuture<Void> finish(boolean commit, HybridTimestamp
executionTimestamp, boolean full, boolean timeoutExceeded) {
+ public CompletableFuture<Void> finish(
+ boolean commitIntent,
+ HybridTimestamp executionTimestamp,
+ boolean full,
+ boolean timeoutExceeded
+ ) {
assert !full : "Read-only transactions cannot be full.";
- assert !(commit && timeoutExceeded) : "Transaction cannot commit with
timeout exceeded.";
+ assert !(commitIntent && timeoutExceeded) : "Transaction cannot commit
with timeout exceeded.";
if (!finishGuard.compareAndSet(false, true)) {
return nullCompletedFuture();
@@ -147,7 +152,10 @@ public class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
txFuture.complete(null);
- ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(new
TxIdAndTimestamp(readTimestamp, id()), timeoutExceeded);
+ ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(
+ commitIntent,
+ new TxIdAndTimestamp(readTimestamp, id()),
+ timeoutExceeded);
this.timeoutExceeded = timeoutExceeded;
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 51256b96e96..1b9ef100004 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -191,7 +191,10 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
@Override
public CompletableFuture<Void> finish(
- boolean commit, @Nullable HybridTimestamp executionTimestamp,
boolean full, boolean timeoutExceeded
+ boolean commit,
+ @Nullable HybridTimestamp executionTimestamp,
+ boolean full,
+ boolean timeoutExceeded
) {
assert !(commit && timeoutExceeded) : "Transaction cannot commit with
timeout exceeded.";
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index 134e0899914..4dc2230724f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -263,11 +263,16 @@ public class TransactionInflights {
}
CompletableFuture<Void> performFinish(boolean commit,
Function<Boolean, CompletableFuture<Void>> finishAction) {
- waitReadyToFinish(commit)
- .whenComplete((ignoredReadyToFinish, readyException) ->
finishAction.apply(commit && readyException == null)
- .whenComplete((ignoredFinishActionResult,
finishException) ->
- completeFinishInProgressFuture(commit,
readyException, finishException))
- );
+ waitReadyToFinish(commit).whenComplete((ignoredReadyToFinish,
readyException) -> {
+ try {
+ CompletableFuture<Void> actionFut =
finishAction.apply(commit && readyException == null);
+
+ actionFut.whenComplete((ignoredFinishActionResult,
finishException) ->
+ completeFinishInProgressFuture(commit,
readyException, finishException));
+ } catch (Throwable err) {
+ completeFinishInProgressFuture(commit, readyException,
err);
+ }
+ });
return finishInProgressFuture;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index eefde7504da..d6227984066 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -56,7 +56,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
@@ -77,6 +76,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
@@ -113,6 +113,7 @@ import
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import
org.apache.ignite.internal.tx.impl.DeadlockPreventionPolicyImpl.TxIdComparators;
import
org.apache.ignite.internal.tx.impl.TransactionInflights.ReadWriteTxContext;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import org.apache.ignite.internal.tx.views.LocksViewProvider;
import org.apache.ignite.internal.tx.views.TransactionsViewProvider;
import org.apache.ignite.internal.util.CompletableFutures;
@@ -182,18 +183,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
/** Prevents double stopping of the tracker. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
- /**
- * Total number of started transaction.
- * TODO: IGNITE-21440 Implement transaction metrics.
- */
- private final LongAdder startedTxs = new LongAdder();
-
- /**
- * Total number of finished transaction.
- * TODO: IGNITE-21440 Implement transaction metrics.
- */
- private final LongAdder finishedTxs = new LongAdder();
-
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -247,6 +236,10 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
private volatile int lockRetryCount = 0;
+ private final MetricManager metricsManager;
+
+ private final TransactionMetricsSource txMetrics;
+
/**
* Test-only constructor.
*
@@ -263,6 +256,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
* @param resourcesRegistry Resources registry.
* @param transactionInflights Transaction inflights.
* @param lowWatermark Low watermark.
+ * @param metricManager Metric manager.
*/
@TestOnly
public TxManagerImpl(
@@ -279,7 +273,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
RemotelyTriggeredResourceRegistry resourcesRegistry,
TransactionInflights transactionInflights,
LowWatermark lowWatermark,
- ScheduledExecutorService commonScheduler
+ ScheduledExecutorService commonScheduler,
+ MetricManager metricManager
) {
this(
clusterService.nodeName(),
@@ -300,7 +295,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
lowWatermark,
commonScheduler,
new FailureManager(new NoOpFailureHandler()),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ metricManager
);
}
@@ -322,6 +318,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
* @param resourcesRegistry Resources registry.
* @param transactionInflights Transaction inflights.
* @param lowWatermark Low watermark.
+ * @param metricManager Metric manager.
*/
public TxManagerImpl(
String nodeName,
@@ -342,7 +339,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
LowWatermark lowWatermark,
ScheduledExecutorService commonScheduler,
FailureProcessor failureProcessor,
- NodeProperties nodeProperties
+ NodeProperties nodeProperties,
+ MetricManager metricManager
) {
this.txConfig = txConfig;
this.systemCfg = systemCfg;
@@ -363,6 +361,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
this.commonScheduler = commonScheduler;
this.failureProcessor = failureProcessor;
this.nodeProperties = nodeProperties;
+ this.metricsManager = metricManager;
placementDriverHelper = new PlacementDriverHelper(placementDriver,
clockService);
@@ -396,6 +395,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
txCleanupRequestSender =
new TxCleanupRequestSender(txMessageSender,
placementDriverHelper, txStateVolatileStorage);
+
+ txMetrics = new TransactionMetricsSource(clockService);
}
private CompletableFuture<Boolean> primaryReplicaEventListener(
@@ -455,8 +456,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
boolean readOnly,
InternalTxOptions options
) {
- startedTxs.add(1);
-
InternalTransaction tx;
if (readOnly) {
@@ -469,6 +468,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
txStateVolatileStorage.initialize(tx);
+ txMetrics.onTransactionStarted();
+
return tx;
}
@@ -598,12 +599,14 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
@Override
public void finishFull(
- HybridTimestampTracker timestampTracker, UUID txId, @Nullable
HybridTimestamp ts, boolean commit, boolean timeoutExceeded
+ HybridTimestampTracker timestampTracker,
+ UUID txId,
+ @Nullable HybridTimestamp ts,
+ boolean commit,
+ boolean timeoutExceeded
) {
TxState finalState;
- finishedTxs.add(1);
-
if (commit) {
assert ts != null : "RW transaction commit timestamp cannot be
null.";
@@ -624,6 +627,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
timeoutExceeded
));
+ txMetrics.onReadWriteTransactionFinished(txId, finalState ==
COMMITTED);
+
decrementRwTxCount(txId);
}
@@ -649,8 +654,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
assertReplicationGroupType(replicationGroupId);
}
- finishedTxs.add(1);
-
assert enlistedGroups != null;
if (enlistedGroups.isEmpty()) {
@@ -664,6 +667,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
timeout
));
+ txMetrics.onReadWriteTransactionFinished(txId, commitIntent);
+
decrementRwTxCount(txId);
return nullCompletedFuture();
@@ -688,7 +693,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
// Means we failed to CAS the state, someone else did it.
if (finishingStateMeta != stateMeta) {
- // If the state is FINISHING then someone else hase in in the
middle of finishing this tx.
+ // If the state is FINISHING then someone else is in the middle of
finishing this tx.
if (stateMeta.txState() == FINISHING) {
return ((TxStateMetaFinishing) stateMeta).txFinishFuture()
.thenCompose(meta -> checkTxOutcome(commitIntent,
txId, meta));
@@ -710,11 +715,15 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
txId,
finishingStateMeta.txFinishFuture()
)
- ).thenAccept(unused -> {
+ ).whenComplete((unused, throwable) -> {
if (localNodeId.equals(finishingStateMeta.txCoordinatorId())) {
+ txMetrics.onReadWriteTransactionFinished(txId, commitIntent &&
throwable == null);
+
decrementRwTxCount(txId);
}
- }).whenComplete((unused, throwable) ->
transactionInflights.removeTxContext(txId));
+
+ transactionInflights.removeTxContext(txId);
+ });
}
private void assertReplicationGroupType(ReplicationGroupId
replicationGroupId) {
@@ -914,12 +923,12 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
@Override
public int finished() {
- return finishedTxs.intValue();
+ return (int) txMetrics.finishedTransactions();
}
@Override
public int pending() {
- return startedTxs.intValue() - finishedTxs.intValue();
+ return (int) txMetrics.activeTransactions();
}
@Override
@@ -1021,6 +1030,9 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
lockRetryCount = toIntExact(longProperty(systemCfg,
LOCK_RETRY_COUNT_PROP, LOCK_RETRY_COUNT_PROP_DEFAULT_VALUE));
+ metricsManager.registerSource(txMetrics);
+ metricsManager.enable(txMetrics);
+
return nullCompletedFuture();
});
}
@@ -1148,11 +1160,11 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
return runAsync(runnable, writeIntentSwitchPool);
}
- void completeReadOnlyTransactionFuture(TxIdAndTimestamp txIdAndTimestamp,
boolean timeoutExceeded) {
- finishedTxs.add(1);
-
+ void completeReadOnlyTransactionFuture(boolean commitIntent,
TxIdAndTimestamp txIdAndTimestamp, boolean timeoutExceeded) {
UUID txId = txIdAndTimestamp.getTxId();
+ txMetrics.onReadOnlyTransactionFinished(txId, commitIntent);
+
transactionInflights.markReadOnlyTxFinished(txId, timeoutExceeded);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
new file mode 100644
index 00000000000..84d5b6d9de5
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.metrics;
+
+import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.LongAdderMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource.Holder;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Transaction metric source, that contains a set of transaction metrics.
+ **/
+public class TransactionMetricsSource extends AbstractMetricSource<Holder> {
+ /** Histogram buckets for duration metrics in milliseconds. */
+ private static final long[] HISTOGRAM_BUCKETS =
+ {1, 2, 4, 8, 16, 25, 50, 75, 100, 250, 500, 750, 1000, 3000, 5000,
10000, 25000, 60000};
+
+ /** Source name. */
+ public static final String SOURCE_NAME = "transactions";
+
+ /** Clock service to calculate a timestamp for rolled back transactions. */
+ private final ClockService clockService;
+
+ /**
+ * Creates a new instance of {@link TransactionMetricsSource}.
+ */
+ public TransactionMetricsSource(ClockService clockService) {
+ super(SOURCE_NAME, "Transaction metrics.");
+
+ this.clockService = clockService;
+ }
+
+ /**
+ * Updates read-write related metrics.
+ *
+ * @param transactionId Transaction identifier.
+ * @param commit {@code true} if a transaction was committed, and {@code
false} otherwise.
+ */
+ public void onReadWriteTransactionFinished(UUID transactionId, boolean
commit) {
+ Holder holder = holder();
+
+ if (holder != null) {
+ holder.rwDuration.add(calculateTransactionDuration(transactionId));
+
+ holder.activeTransactions.decrement();
+
+ if (commit) {
+ holder.totalCommits.increment();
+ holder.rwCommits.increment();
+ } else {
+ holder.totalRollbacks.increment();
+ holder.rwRollbacks.increment();
+ }
+ }
+ }
+
+ /**
+ * Updates read-only related metrics.
+ *
+ * @param transactionId Transaction identifier.
+ * @param commit {@code true} if a transaction was committed, and {@code
false} otherwise.
+ */
+ public void onReadOnlyTransactionFinished(UUID transactionId, boolean
commit) {
+ Holder holder = holder();
+
+ if (holder != null) {
+ holder.roDuration.add(calculateTransactionDuration(transactionId));
+
+ holder.activeTransactions.decrement();
+
+ if (commit) {
+ holder.totalCommits.increment();
+ holder.roCommits.increment();
+ } else {
+ holder.totalRollbacks.increment();
+ holder.roRollbacks.increment();
+ }
+ }
+ }
+
+ /**
+ * Tracks a number of active transactions.
+ */
+ public void onTransactionStarted() {
+ Holder holder = holder();
+
+ if (holder != null) {
+ holder.activeTransactions.increment();
+ }
+ }
+
+ /**
+ * Returns a number of active transactions.
+ * If this metric source is not enabled, then always returns {@code 0}.
+ *
+ * @return Number of active transactions.
+ */
+ public long activeTransactions() {
+ Holder holder = holder();
+
+ if (holder != null) {
+ return holder.activeTransactions.value();
+ }
+
+ return 0L;
+ }
+
+ /**
+ * Returns a number of finished transactions.
+ * If this metric source is not enabled, then always returns {@code 0}.
+ *
+ * @return Number of finished transactions.
+ */
+ public long finishedTransactions() {
+ Holder holder = holder();
+
+ if (holder != null) {
+ return holder.totalCommits.value() + holder.totalRollbacks.value();
+ }
+
+ return 0L;
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder();
+ }
+
+ private long calculateTransactionDuration(UUID transactionId) {
+ return clockService.currentLong() -
beginTimestamp(transactionId).getPhysical();
+ }
+
+ /** Holder. */
+ protected static class Holder implements
AbstractMetricSource.Holder<Holder> {
+ private final LongAdderMetric totalCommits = new LongAdderMetric(
+ "TotalCommits",
+ "Total number of commits.");
+
+ private final LongAdderMetric totalRollbacks = new LongAdderMetric(
+ "TotalRollbacks",
+ "Total number of rollbacks.");
+
+ private final LongAdderMetric rwCommits = new LongAdderMetric(
+ "RwCommits",
+ "Total number of read-write transaction commits.");
+
+ private final LongAdderMetric roCommits = new LongAdderMetric(
+ "RoCommits",
+ "Total number of read-only transaction commits.");
+
+ private final LongAdderMetric rwRollbacks = new LongAdderMetric(
+ "RwRollbacks",
+ "Total number of rolled-back read-write transactions.");
+
+ private final LongAdderMetric roRollbacks = new LongAdderMetric(
+ "RoRollbacks",
+ "Total number of rolled-back read-only transactions.");
+
+ private final DistributionMetric rwDuration = new DistributionMetric(
+ "RwDuration",
+ ".",
+ HISTOGRAM_BUCKETS);
+
+ private final DistributionMetric roDuration = new DistributionMetric(
+ "RoDuration",
+ ".",
+ HISTOGRAM_BUCKETS);
+
+ @TestOnly
+ private final LongAdderMetric activeTransactions = new LongAdderMetric(
+ "Active",
+ "Number of running transactions.");
+
+ private final List<Metric> metrics = List.of(
+ totalCommits,
+ rwCommits,
+ roCommits,
+ totalRollbacks,
+ rwRollbacks,
+ roRollbacks,
+ rwDuration,
+ roDuration);
+
+ @Override
+ public Iterable<Metric> metrics() {
+ return metrics;
+ }
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
index 97b8615cce4..02ef96b04ed 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
@@ -122,9 +122,13 @@ public class TransactionsViewProvider {
}
private static String deriveTransactionType(@Nullable
InternalTransaction tx) {
- assert tx != null;
-
- return tx.isReadOnly() ? READ_ONLY : READ_WRITE;
+ // The transaction tx can be null under some circumstances,
+ // even though this node is a transaction coordinator.
+ if (tx != null) {
+ return tx.isReadOnly() ? READ_ONLY : READ_WRITE;
+ } else {
+ return "n/a";
+ }
}
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TransactionMetricSourceTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TransactionMetricSourceTest.java
new file mode 100644
index 00000000000..9c7defbfb43
--- /dev/null
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TransactionMetricSourceTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests metric source name and transaction metric names.
+ * If you want to change the name, or add a new metric, please don't forget to
update the corresponding documentation.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TransactionMetricSourceTest extends BaseIgniteAbstractTest {
+ @Mock
+ ClockService clockService;
+
+ @Test
+ void testMetricSourceName() {
+ assertThat(TransactionMetricsSource.SOURCE_NAME, is("transactions"));
+ }
+
+ @Test
+ void testMetricNames() {
+ var metricSource = new TransactionMetricsSource(clockService);
+
+ MetricSet set = metricSource.enable();
+
+ assertThat(set, is(notNullValue()));
+
+ Set<String> expectedMetrics = Set.of(
+ "TotalCommits",
+ "TotalRollbacks",
+ "RwCommits",
+ "RwRollbacks",
+ "RoCommits",
+ "RoRollbacks",
+ "RwDuration",
+ "RoDuration");
+
+ var actualMetrics = new HashSet<String>();
+ set.forEach(m -> actualMetrics.add(m.name()));
+
+ assertThat(actualMetrics, is(expectedMetrics));
+ }
+}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index e8f1730499c..44bdbe3a6a8 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -77,6 +77,7 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.TestMetricManager;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -191,7 +192,8 @@ public class TxManagerTest extends IgniteAbstractTest {
resourceRegistry,
transactionInflights,
lowWatermark,
- commonScheduler
+ commonScheduler,
+ new TestMetricManager()
);
assertThat(txManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());