This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 33fa7b0f3fa IGNITE-26328 Add table metrics (#6544)
33fa7b0f3fa is described below
commit 33fa7b0f3fadae266343ef2b39a780fb5686f215
Author: Slava Koptilin <[email protected]>
AuthorDate: Wed Sep 10 13:06:18 2025 +0300
IGNITE-26328 Add table metrics (#6544)
---
.../ignite/client/fakes/FakeInternalTable.java | 6 +
.../rebalance/ItRebalanceDistributedTest.java | 3 +-
.../apache/ignite/internal/index/IndexManager.java | 4 +-
.../partition/replicator/fixtures/Node.java | 3 +-
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../exec/rel/TableScanNodeExecutionTest.java | 5 +-
modules/table/build.gradle | 1 +
...xDistributedTestSingleNodeNoCleanupMessage.java | 5 +-
.../ignite/internal/table/ItColocationTest.java | 5 +-
.../internal/table/metrics/ItTableMetricsTest.java | 631 +++++++++++++++++++++
.../ignite/internal/table/InternalTable.java | 8 +
.../apache/ignite/internal/table/TableImpl.java | 6 +
.../ignite/internal/table/TableViewInternal.java | 8 +
.../internal/table/distributed/TableManager.java | 45 +-
.../replicator/PartitionReplicaListener.java | 161 +++++-
.../distributed/storage/InternalTableImpl.java | 12 +-
.../internal/table/metrics/TableMetricSource.java | 222 ++++++++
.../distributed/TableManagerRecoveryTest.java | 4 +-
.../table/distributed/TableManagerTest.java | 4 +-
.../PartitionReplicaListenerIndexLockingTest.java | 5 +-
...itionReplicaListenerSortedIndexLockingTest.java | 5 +-
.../replication/PartitionReplicaListenerTest.java | 5 +-
.../ZonePartitionReplicaListenerTest.java | 5 +-
.../storage/InternalTableEstimatedSizeTest.java | 8 +-
.../distributed/storage/InternalTableImplTest.java | 5 +-
.../table/metrics/TableMetricSourceTest.java | 65 +++
.../apache/ignite/distributed/ItTxTestCluster.java | 7 +-
.../table/impl/DummyInternalTableImpl.java | 8 +-
29 files changed, 1207 insertions(+), 45 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 705788f6979..ce5759970d4 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -582,4 +583,9 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
payload)
.thenApply(resBytes -> new IgniteBiTuple<>(resBytes,
FakeCompute.observableTimestamp.longValue()));
}
+
+ @Override
+ public TableMetricSource metrics() {
+ return null;
+ }
}
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 14c9cfd18be..d761b35bf3a 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1648,7 +1648,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
partitionReplicaLifecycleManager,
nodeProperties,
minTimeCollectorService,
- systemDistributedConfiguration
+ systemDistributedConfiguration,
+ metricManager
) {
@Override
protected TxStateStorage createTxStateTableStorage(
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 718717683f4..c23f00b43c5 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -211,8 +211,8 @@ public class IndexManager implements IgniteComponent {
if (LOG.isInfoEnabled()) {
LOG.info(
- "Creating local index: name={}, id={}, tableId={},
token={}",
- index.name(), indexId, tableId, causalityToken
+ "Creating local index: name={}, id={}, tableId={},
token={}, type={}",
+ index.name(), indexId, tableId, causalityToken,
index.indexType()
);
}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 84122d98061..1a06ed18804 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -803,7 +803,8 @@ public class Node {
partitionReplicaLifecycleManager,
nodeProperties,
minTimeCollectorService,
- systemDistributedConfiguration
+ systemDistributedConfiguration,
+ metricManager
) {
@Override
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 1b17e86b6fd..8e0bded6243 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -811,7 +811,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
partitionReplicaLifecycleListener,
nodeProperties,
minTimeCollectorService,
- systemDistributedConfiguration
+ systemDistributedConfiguration,
+ metricManager
);
var indexManager = new IndexManager(
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 06fb6074e40..4ec1efa464d 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1148,7 +1148,8 @@ public class IgniteImpl implements Ignite {
partitionReplicaLifecycleManager,
nodeProperties,
minTimeCollectorService,
- systemDistributedConfiguration
+ systemDistributedConfiguration,
+ metricManager
);
disasterRecoveryManager = new DisasterRecoveryManager(
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index ff2f9fb7483..3c2b73cc3a2 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -88,6 +88,7 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.tx.TxManager;
@@ -101,6 +102,7 @@ import
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -341,7 +343,8 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- colocationEnabled()
+ colocationEnabled(),
+ new TableMetricSource(QualifiedName.fromSimple("test"))
);
this.dataAmount = dataAmount;
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 8b244242c92..86622fde1c3 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -86,6 +86,7 @@ dependencies {
testImplementation testFixtures(project(':ignite-table'))
testImplementation testFixtures(project(':ignite-low-watermark'))
testImplementation testFixtures(project(':ignite-failure-handler'))
+ testImplementation testFixtures(project(':ignite-metrics'))
testImplementation libs.jmh.core
testImplementation libs.javax.annotations
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index ea00e8d6358..9f49366cc50 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -63,6 +63,7 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
@@ -77,6 +78,7 @@ import
org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.BeforeEach;
@@ -215,7 +217,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
mock(IndexMetaStorage.class),
lowWatermark,
mock(FailureProcessor.class),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new
TableMetricSource(QualifiedName.fromSimple("test_table"))
) {
@Override
public CompletableFuture<ReplicaResult>
invoke(ReplicaRequest request, UUID senderId) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 8bc147b8e68..6cea81e1e53 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -105,6 +105,7 @@ import
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersion
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
@@ -125,6 +126,7 @@ import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
import org.apache.ignite.table.Tuple;
import org.jetbrains.annotations.Nullable;
@@ -369,7 +371,8 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- colocationEnabled()
+ colocationEnabled(),
+ new TableMetricSource(QualifiedName.fromSimple("TEST"))
);
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
new file mode 100644
index 00000000000..2982523a157
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.metrics;
+
+import static java.util.List.of;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilizationOnDefaultZone;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
+import static
org.apache.ignite.internal.table.metrics.TableMetricSource.RO_READS;
+import static
org.apache.ignite.internal.table.metrics.TableMetricSource.RW_READS;
+import static
org.apache.ignite.internal.table.metrics.TableMetricSource.WRITES;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.QualifiedName;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests key-value and record view operations metrics.
+ */
+public class ItTableMetricsTest extends ClusterPerClassIntegrationTest {
+ private static final String TABLE_NAME = "test_table_name".toUpperCase();
+
+ private static final String SORTED_IDX = "SORTED_IDX";
+ private static final String HASH_IDX = "HASH_IDX";
+
+ private static final String METRIC_SOURCE_NAME =
TableMetricSource.SOURCE_NAME + '.'
+ + QualifiedName.fromSimple(TABLE_NAME).toCanonicalForm();
+
+ @Override
+ protected int initialNodes() {
+ return 2;
+ }
+
+ @BeforeAll
+ void createTable() throws Exception {
+ sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val
VARCHAR)");
+
+ sql("CREATE INDEX IF NOT EXISTS " + SORTED_IDX + " ON PUBLIC." +
TABLE_NAME + " USING SORTED (id)");
+ sql("CREATE INDEX IF NOT EXISTS " + HASH_IDX + " ON PUBLIC." +
TABLE_NAME + " USING HASH (val)");
+
+ if (colocationEnabled()) {
+ awaitAssignmentsStabilizationOnDefaultZone(CLUSTER.aliveNode());
+ }
+ }
+
+ /**
+ * Returns a key value view for the table {@link #TABLE_NAME}.
+ *
+ * @param nodeIndex Node index to create a key value view.
+ * @return Key value view.
+ */
+ private static KeyValueView<Integer, String> keyValueView(int nodeIndex) {
+ return
CLUSTER.node(nodeIndex).tables().table(TABLE_NAME).keyValueView(Integer.class,
String.class);
+ }
+
+ /**
+ * Returns a record view for the table {@link #TABLE_NAME}.
+ *
+ * @param nodeIndex Node index to create a key value view.
+ * @return Record view.
+ */
+ private static RecordView<Tuple> recordView(int nodeIndex) {
+ return CLUSTER.node(nodeIndex).tables().table(TABLE_NAME).recordView();
+ }
+
+ @Test
+ void get() {
+ // Implicit read-only transaction.
+ testKeyValueViewOperation(RO_READS, 1, view -> view.get(null, 12));
+
+ // Explicit read-write transaction.
+ testKeyValueViewOperation(RW_READS, 1, view -> {
+ Transaction tx = node(0).transactions().begin();
+
+ view.get(tx, 12);
+
+ tx.commit();
+ });
+ }
+
+ @Test
+ void getAll() {
+ List<Integer> keys = of(12, 15, 17, 19, 23);
+
+ // Implicit getAll operation starts a read-write transaction when all
keys are not mapped to the same partition.
+ testKeyValueViewOperation(RW_READS, keys.size(), view ->
view.getAll(null, keys));
+
+ // Single key getAll operation starts a read-only transaction.
+ List<Integer> roKeys = of(12);
+ testKeyValueViewOperation(RO_READS, 1, view -> view.getAll(null,
roKeys));
+
+ List<Integer> nonUniqueKeys = of(12, 15, 12);
+ testKeyValueViewOperation(RW_READS, nonUniqueKeys.size(), view ->
view.getAll(null, nonUniqueKeys));
+ }
+
+ @Test
+ void getOrDefault() {
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+
+ Integer existingKey = 12;
+ Integer nonExistingKey = -1;
+
+ kvView.put(null, existingKey, "value_12");
+ kvView.remove(null, nonExistingKey);
+
+ testKeyValueViewOperation(RO_READS, 2, view -> {
+ view.getOrDefault(null, existingKey, "default");
+ view.getOrDefault(null, nonExistingKey, "default");
+ });
+
+ testKeyValueViewOperation(RW_READS, 2, view -> {
+ Transaction tx = node(0).transactions().begin();
+
+ view.getOrDefault(tx, existingKey, "default");
+ view.getOrDefault(tx, nonExistingKey, "default");
+
+ tx.commit();
+ });
+ }
+
+ @Test
+ void contains() {
+ testKeyValueViewOperation(RO_READS, 1, view -> view.contains(null,
12));
+
+ testKeyValueViewOperation(RW_READS, 1, view -> {
+ Transaction tx = node(0).transactions().begin();
+
+ view.contains(tx, 12);
+
+ tx.commit();
+ });
+ }
+
+ @Test
+ void containsAll() {
+ List<Integer> keys = of(12, 15, 17, 19, 23);
+
+ // Implicit containsAll operation starts a read-write transaction when
all keys are not mapped to the same partition.
+ testKeyValueViewOperation(RW_READS, keys.size(), view ->
view.containsAll(null, keys));
+
+ // Single key.
+ List<Integer> roKeys = of(12);
+ testKeyValueViewOperation(RO_READS, 1, view -> view.containsAll(null,
roKeys));
+ }
+
+ @Test
+ void put() {
+ testKeyValueViewOperation(WRITES, 1, view -> view.put(null, 42,
"value_42"));
+ }
+
+ @Test
+ void putAll() {
+ Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19,
"19", 23, "23");
+
+ testKeyValueViewOperation(WRITES, values.size(), view ->
view.putAll(null, values));
+ }
+
+ @Test
+ void getAndPut() {
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view ->
view.getAndPut(null, 12, "value"));
+ }
+
+ @Test
+ void remove() {
+ Integer key = 12;
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.put(null, key, "value_42");
+
+ // Remove existing key.
+ testKeyValueViewOperation(WRITES, 1, view -> view.remove(null, key));
+
+ // Remove non existing key.
+ testKeyValueViewOperation(WRITES, 0, view -> view.remove(null, key));
+ }
+
+ @Test
+ void exactRemove() {
+ Integer key = 12;
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.put(null, key, "value_42");
+
+ // Remove existing key and non-matching value.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view ->
view.remove(null, key, "wrong-value"));
+
+ // Remove existing key and matching value.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view ->
view.remove(null, key, "value_42"));
+
+ // Remove non existing key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view ->
view.remove(null, key, "value_42"));
+ }
+
+ @Test
+ void removeAll() {
+ Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19,
"19", 23, "23");
+
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.removeAll(null);
+ kvView.putAll(null, values);
+
+ testKeyValueViewOperation(WRITES, values.size(), view ->
view.removeAll(null));
+ }
+
+ @Test
+ void removeCollectionKeys() {
+ Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19,
"19", 23, "23");
+
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.putAll(null, values);
+
+ // Remove existing keys.
+ testKeyValueViewOperation(WRITES, values.size(), view ->
view.removeAll(null, values.keySet()));
+
+ // Remove non-existing keys.
+ testKeyValueViewOperation(WRITES, 0, view -> view.removeAll(null,
values.keySet()));
+
+ kvView.putAll(null, values);
+
+ // Remove non-unique keys.
+ List<Integer> nonUniqueKeys = of(12, 15, 12, 17, 19, 23);
+ testKeyValueViewOperation(WRITES, nonUniqueKeys.size() - 1, view ->
view.removeAll(null, nonUniqueKeys));
+ }
+
+ @Test
+ void putIfAbsent() {
+ Integer key = 12;
+
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.remove(null, key);
+
+ // Insert absent key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view ->
view.putIfAbsent(null, key, "value"));
+
+ // Insert existing key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view ->
view.putIfAbsent(null, key, "value-42"));
+ }
+
+ @Test
+ void getAndRemove() {
+ Integer key = 12;
+
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.put(null, key, "value_42");
+
+ // Remove existing key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view ->
view.getAndRemove(null, key));
+
+ // Remove non-existing key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view ->
view.getAndRemove(null, key));
+ }
+
+ @Test
+ void replace() {
+ Integer key = 12;
+
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.put(null, key, "value");
+
+ // Replace existing key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view ->
view.replace(null, key, "replaced"));
+
+ kvView.remove(null, key);
+
+ // Replace non-existing key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view ->
view.replace(null, key, "value"));
+ }
+
+ @Test
+ void conditionalReplace() {
+ Integer key = 12;
+
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.put(null, key, "value");
+
+ // Replace existing key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view ->
view.replace(null, key, "wrong", "replaced"));
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view ->
view.replace(null, key, "value", "replaced"));
+
+ kvView.remove(null, key);
+
+ // Replace non-existing key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view ->
view.replace(null, key, "replaced", "value"));
+ }
+
+ @Test
+ void getAndReplace() {
+ Integer key = 12;
+
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.put(null, key, "value");
+
+ // Replace existing key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view ->
view.getAndReplace(null, key, "replaced"));
+
+ kvView.remove(null, key);
+
+ // Replace non-existing key.
+ testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view ->
view.replace(null, key, "replaced"));
+ }
+
+ @Test
+ void insertAll() {
+ List<Tuple> keys = of(Tuple.create().set("id", 12),
Tuple.create().set("id", 42));
+ List<Tuple> recs = keys.stream().map(t -> Tuple.copy(t).set("val",
"value_" + t.intValue("id"))).collect(toList());
+
+ recordView(0).deleteAll(null, keys);
+
+ // Insert non-existing keys.
+ testRecordViewOperation(of(WRITES, RW_READS), of((long) recs.size(),
(long) recs.size()), view -> view.insertAll(null, recs));
+
+ // Insert existing keys.
+ testRecordViewOperation(of(WRITES, RW_READS), of(0L, (long)
recs.size()), view -> view.insertAll(null, recs));
+
+ recordView(0).delete(null, keys.get(0));
+
+ // Insert one non-existing key.
+ testRecordViewOperation(of(WRITES, RW_READS), of(1L, (long)
recs.size()), view -> view.insertAll(null, recs));
+
+ // Insert non-unique keys.
+ List<Tuple> nonUniqueKeys = of(
+ Tuple.create().set("id", 12),
+ Tuple.create().set("id", 42),
+ Tuple.create().set("id", 12));
+ List<Tuple> nonUniqueValues = nonUniqueKeys
+ .stream()
+ .map(t -> Tuple.copy(t).set("val", "value_" +
t.intValue("id")))
+ .collect(toList());
+
+ recordView(0).deleteAll(null, keys);
+
+ testRecordViewOperation(
+ of(WRITES, RW_READS),
+ of((long) nonUniqueKeys.size() - 1, (long)
nonUniqueKeys.size()),
+ view -> view.insertAll(null, nonUniqueValues));
+ }
+
+ @Test
+ void deleteAll() {
+ List<Tuple> keys = of(Tuple.create().set("id", 12),
Tuple.create().set("id", 42));
+ List<Tuple> recs = keys.stream().map(t -> Tuple.copy(t).set("val",
"value_" + t.intValue("id"))).collect(toList());
+
+ recordView(0).upsertAll(null, recs);
+
+ // Delete existing keys.
+ testRecordViewOperation(WRITES, recs.size(), view ->
view.deleteAll(null, keys));
+
+ // Delete non-existing keys.
+ testRecordViewOperation(WRITES, 0L, view -> view.deleteAll(null,
keys));
+
+ recordView(0).insert(null, recs.get(0));
+
+ // Delete one non-existing key.
+ testRecordViewOperation(WRITES, 1L, view -> view.deleteAll(null,
keys));
+
+ // Non-unique keys.
+ List<Tuple> nonUniqueKeys = of(
+ Tuple.create().set("id", 12),
+ Tuple.create().set("id", 42),
+ Tuple.create().set("id", 12));
+ List<Tuple> nonUniqueRecs = nonUniqueKeys
+ .stream()
+ .map(t -> Tuple.copy(t).set("val", "value_" +
t.intValue("id")))
+ .collect(toList());
+
+ recordView(0).upsertAll(null, nonUniqueRecs);
+
+ testRecordViewOperation(WRITES, 2L, view -> view.deleteAll(null,
nonUniqueKeys));
+ }
+
+ @Test
+ void deleteAllExact() {
+ List<Tuple> keys = of(Tuple.create().set("id", 12),
Tuple.create().set("id", 42));
+ List<Tuple> recs = keys.stream().map(t -> Tuple.copy(t).set("val",
"value_" + t.intValue("id"))).collect(toList());
+
+ recordView(0).upsertAll(null, recs);
+
+ // Delete existing keys.
+ testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(),
(long) recs.size()), view -> view.deleteAllExact(null, recs));
+
+ // Delete non-existing keys.
+ testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(),
0L), view -> view.deleteAllExact(null, recs));
+
+ recordView(0).insert(null, recs.get(0));
+
+ // Delete one non-existing key.
+ testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(),
1L), view -> view.deleteAllExact(null, recs));
+
+ recordView(0).upsertAll(null, recs);
+ List<Tuple> nonExact = keys.stream().map(t -> Tuple.copy(t).set("val",
"value_xyz_" + t.intValue("id"))).collect(toList());
+
+ testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(),
0L), view -> view.deleteAllExact(null, nonExact));
+
+ // Non-unique keys.
+ List<Tuple> nonUniqueKeys = of(
+ Tuple.create().set("id", 12),
+ Tuple.create().set("id", 42),
+ Tuple.create().set("id", 12));
+ List<Tuple> nonUniqueRecs = nonUniqueKeys
+ .stream()
+ .map(t -> Tuple.copy(t).set("val", "value_" +
t.intValue("id")))
+ .collect(toList());
+
+ recordView(0).upsertAll(null, nonUniqueRecs);
+
+ testRecordViewOperation(
+ of(RW_READS, WRITES),
+ of((long) nonUniqueRecs.size(), 2L),
+ view -> view.deleteAllExact(null, nonUniqueRecs));
+ }
+
+ @Test
+ void scan() {
+ Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19,
"19", 23, "23");
+
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.removeAll(null);
+ kvView.putAll(null, values);
+
+ testKeyValueViewOperation(of(RO_READS, RW_READS), of(0L, (long)
values.size()), view -> {
+ Transaction tx = node(0).transactions().begin();
+
+ Object[] emptyArgs = new Object[0];
+ sql(0, tx, "select * from " + TABLE_NAME, emptyArgs);
+
+ tx.commit();
+ });
+
+ testKeyValueViewOperation(of(RO_READS, RW_READS), of((long)
values.size(), 0L), view -> {
+ Object[] emptyArgs = new Object[0];
+ sql(0, null, "select * from " + TABLE_NAME, emptyArgs);
+ });
+ }
+
+ @Test
+ void sortedIndexScan() {
+ Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19,
"19", 23, "23");
+
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.removeAll(null);
+ kvView.putAll(null, values);
+
+ testKeyValueViewOperation(of(RO_READS, RW_READS), of(2L, 0L), view -> {
+ Object[] emptyArgs = new Object[0];
+ sql(0, null, "select /*+ force_index (" + SORTED_IDX + ") */ *
from " + TABLE_NAME + " where id > 15 and id < 20", emptyArgs);
+ });
+
+ testKeyValueViewOperation(of(RO_READS, RW_READS), of(0L, 2L), view -> {
+ Transaction tx = node(0).transactions().begin();
+
+ Object[] emptyArgs = new Object[0];
+ sql(0, tx, "select /*+ force_index (" + SORTED_IDX + ") */ * from
" + TABLE_NAME + " where id > 15 and id < 20", emptyArgs);
+
+ tx.commit();
+ });
+ }
+
+ @Test
+ void hashIndexScan() {
+ Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19,
"19", 23, "23");
+
+ KeyValueView<Integer, String> kvView = keyValueView(0);
+ kvView.removeAll(null);
+ kvView.putAll(null, values);
+
+ testKeyValueViewOperation(of(RO_READS, RW_READS), of(1L, 0L), view -> {
+ Object[] emptyArgs = new Object[0];
+ sql(0, null, "select /*+ force_index (" + HASH_IDX + ") */ * from
" + TABLE_NAME + " where val = '19'", emptyArgs);
+ });
+
+ testKeyValueViewOperation(of(RO_READS, RW_READS), of(0L, 1L), view -> {
+ Transaction tx = node(0).transactions().begin();
+
+ Object[] emptyArgs = new Object[0];
+ sql(0, tx, "select /*+ force_index (" + HASH_IDX + ") */ * from "
+ TABLE_NAME + " where val = '19'", emptyArgs);
+
+ tx.commit();
+ });
+ }
+
+ /**
+ * Tests that the given operation increases the specified metric by the
expected value.
+ *
+ * @param metricName Metric name to be checked.
+ * @param expectedValue Expected value to increase the metric.
+ * @param op Operation to be executed.
+ */
+ private void testKeyValueViewOperation(String metricName, long
expectedValue, Consumer<KeyValueView<Integer, String>> op) {
+ testKeyValueViewOperation(of(metricName), of(expectedValue), op);
+ }
+
+ /**
+ * Tests that the given operation increases the specified metrics by the
expected values.
+ *
+ * @param metricNames Metric names to be checked.
+ * @param expectedValues Expected values to increase the metrics.
+ * @param op Operation to be executed.
+ */
+ private void testKeyValueViewOperation(
+ List<String> metricNames,
+ List<Long> expectedValues,
+ Consumer<KeyValueView<Integer, String>> op
+ ) {
+ testOperation(metricNames, expectedValues, () ->
op.accept(keyValueView(0)));
+ }
+
+ /**
+ * Tests that the given operation increases the specified metric by the
expected value.
+ *
+ * @param metricName Metric name to be checked.
+ * @param expectedValue Expected value to increase the metric.
+ * @param op Operation to be executed.
+ */
+ private void testRecordViewOperation(String metricName, long
expectedValue, Consumer<RecordView<Tuple>> op) {
+ testRecordViewOperation(of(metricName), of(expectedValue), op);
+ }
+
+ /**
+ * Tests that the given operation increases the specified metrics by the
expected values.
+ *
+ * @param metricNames Metric names to be checked.
+ * @param expectedValues Expected values to increase the metrics.
+ * @param op Operation to be executed.
+ */
+ private void testRecordViewOperation(
+ List<String> metricNames,
+ List<Long> expectedValues,
+ Consumer<RecordView<Tuple>> op
+ ) {
+ testOperation(metricNames, expectedValues, () ->
op.accept(recordView(0)));
+ }
+
+ /**
+ * Tests that the given operation increases the specified metrics by the
expected values.
+ *
+ * @param metricNames Metric names to be checked.
+ * @param expectedValues Expected values to increase the metrics.
+ * @param op Operation to be executed.
+ */
+ private void testOperation(
+ List<String> metricNames,
+ List<Long> expectedValues,
+ Runnable op
+ ) {
+ assertThat(metricNames.size(), is(expectedValues.size()));
+
+ Map<String, Long> initialValues = metricValues(metricNames);
+
+ op.run();
+
+ Map<String, Long> actualValues = metricValues(metricNames);
+
+ for (int i = 0; i < metricNames.size(); ++i) {
+ String metricName = metricNames.get(i);
+ long expectedValue = expectedValues.get(i);
+
+ long initialValue = initialValues.get(metricName);
+ long actualValue = actualValues.get(metricName);
+
+ assertThat(
+ "The actual metric value does not match the expected value
"
+ + "[metric=" + metricName + ", initial=" +
initialValue + ", actual=" + actualValue
+ + ", expected=" + (initialValue + expectedValue) +
']',
+ actualValue,
+ is(initialValue + expectedValue));
+ }
+ }
+
+ /**
+ * Returns the sum of the specified metrics on all nodes.
+ *
+ * @param metricNames Metric names.
+ * @return Map of metric names to their values.
+ */
+ private Map<String, Long> metricValues(List<String> metricNames) {
+ Map<String, Long> values = new HashMap<>(metricNames.size());
+
+ for (int i = 0; i < initialNodes(); ++i) {
+ MetricSet tableMetrics = unwrapIgniteImpl(node(i))
+ .metricManager()
+ .metricSnapshot()
+ .metrics()
+ .get(METRIC_SOURCE_NAME);
+
+ metricNames.forEach(metricName ->
+ values.compute(metricName, (k, v) -> {
+ Metric metric = tableMetrics.get(metricName);
+
+ assertThat("Metric not found [name=" + metricName +
']', metric, is(notNullValue()));
+ assertThat(
+ "Metric is not a LongMetric [name=" +
metricName + ", class=" + metric.getClass().getSimpleName() + ']',
+ metric,
+ instanceOf(LongMetric.class));
+
+ return (v == null ? 0 : v) + ((LongMetric)
metric).value();
+ }));
+ }
+
+ return values;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 5cefe7547f9..44bdbadf5c9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -552,4 +553,11 @@ public interface InternalTable extends ManuallyCloseable {
* @return The id.
*/
ReplicationGroupId targetReplicationGroupId(int partId);
+
+ /**
+ * Returns a metric source for this table.
+ *
+ * @return Table metrics source.
+ */
+ TableMetricSource metrics();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 856142d3efd..8e9908b84cf 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.table.partition.HashPartitionManagerImpl;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.sql.IgniteSql;
@@ -310,4 +311,9 @@ public class TableImpl implements TableViewInternal {
}
});
}
+
+ @Override
+ public TableMetricSource metrics() {
+ return tbl.metrics();
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
index 2a40ba63149..3dac8dd01d2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
@@ -26,6 +26,7 @@ import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
@@ -127,4 +128,11 @@ public interface TableViewInternal extends Table {
* @param indexId An index id to unregister.
*/
void unregisterIndex(int indexId);
+
+ /**
+ * Returns a metric source for this table.
+ *
+ * @return Table metrics source.
+ */
+ TableMetricSource metrics();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 23cd96c1049..3126ff450ba 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -147,6 +147,7 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
@@ -232,6 +233,7 @@ import
org.apache.ignite.internal.table.distributed.storage.BrokenTxStateStorage
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.NullStorageEngine;
import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
@@ -464,6 +466,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final TableAssignmentsService assignmentsService;
private final ReliableCatalogVersions reliableCatalogVersions;
+ private final MetricManager metricManager;
+
/**
* Creates a new table manager.
*
@@ -494,6 +498,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
* @param partitionReplicaLifecycleManager Partition replica lifecycle
manager.
* @param minTimeCollectorService Collects minimum required timestamp for
each partition.
* @param systemDistributedConfiguration System distributed configuration.
+ * @param metricManager Metric manager.
*/
public TableManager(
String nodeName,
@@ -533,7 +538,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
PartitionReplicaLifecycleManager partitionReplicaLifecycleManager,
NodeProperties nodeProperties,
MinimumRequiredTimeCollectorService minTimeCollectorService,
- SystemDistributedConfiguration systemDistributedConfiguration
+ SystemDistributedConfiguration systemDistributedConfiguration,
+ MetricManager metricManager
) {
this.topologyService = topologyService;
this.replicaMgr = replicaMgr;
@@ -563,6 +569,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.partitionReplicaLifecycleManager =
partitionReplicaLifecycleManager;
this.nodeProperties = nodeProperties;
this.minTimeCollectorService = minTimeCollectorService;
+ this.metricManager = metricManager;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
this.executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -1171,6 +1178,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
private void onTableDrop(DropTableEventParameters parameters) {
inBusyLock(busyLock, () -> {
+ unregisterMetricsSource(parameters.tableId());
+
destructionEventsQueue.enqueue(new
DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
});
}
@@ -1533,7 +1542,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
indexMetaStorage,
lowWatermark,
failureProcessor,
- nodeProperties
+ nodeProperties,
+ table.metrics()
);
}
@@ -1804,7 +1814,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
Objects.requireNonNull(streamerReceiverRunner),
() -> txCfg.value().readWriteTimeoutMillis(),
() -> txCfg.value().readOnlyTimeoutMillis(),
- nodeProperties.colocationEnabled()
+ nodeProperties.colocationEnabled(),
+ createAndRegisterMetricsSource(tableName)
);
return new TableImpl(
@@ -3218,6 +3229,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
int tableId = tableDescriptor.id();
if (nextCatalog != null && nextCatalog.table(tableId) == null)
{
+ unregisterMetricsSource(tableId);
+
destructionEventsQueue.enqueue(new
DestroyTableEvent(nextCatalog.version(), tableId));
}
@@ -3546,6 +3559,32 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
}
+ private TableMetricSource createAndRegisterMetricsSource(QualifiedName
tableName) {
+ TableMetricSource source = new TableMetricSource(tableName);
+
+ try {
+ metricManager.registerSource(source);
+ metricManager.enable(source);
+ } catch (Exception e) {
+ LOG.warn("Failed to register metrics source for table [name={}].",
e, source.qualifiedTableName());
+ }
+
+ return source;
+ }
+
+ private void unregisterMetricsSource(int tableId) {
+ try {
+ TableViewInternal table = startedTables.get(tableId);
+ if (table == null) {
+ return;
+ }
+
+ metricManager.unregisterSource(table.metrics());
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister metrics source for table
[tableId={}].", e, tableId);
+ }
+ }
+
private static class TableClosedException extends IgniteInternalException {
private static final long serialVersionUID = 1L;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index a58ee9bb9fb..669a2a58ccc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -196,6 +196,7 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import org.apache.ignite.internal.table.distributed.TableUtils;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
@@ -346,6 +347,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
private static final boolean SKIP_UPDATES =
getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
+ private final TableMetricSource metrics;
+
private final ReplicaPrimacyEngine replicaPrimacyEngine;
private final TableAwareReplicaRequestPreProcessor
tableAwareReplicaRequestPreProcessor;
private final ReliableCatalogVersions reliableCatalogVersions;
@@ -384,6 +387,7 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
* @param clusterNodeResolver Node resolver.
* @param remotelyTriggeredResourceRegistry Resource registry.
* @param indexMetaStorage Index meta storage.
+ * @param metrics Table metric source.
*/
public PartitionReplicaListener(
MvPartitionStorage mvDataStorage,
@@ -412,7 +416,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
IndexMetaStorage indexMetaStorage,
LowWatermark lowWatermark,
FailureProcessor failureProcessor,
- NodeProperties nodeProperties
+ NodeProperties nodeProperties,
+ TableMetricSource metrics
) {
this.mvDataStorage = mvDataStorage;
this.raftCommandRunner = raftCommandRunner;
@@ -435,6 +440,7 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
this.replicationGroupId = replicationGroupId;
this.tableId = tableId;
this.tableLockKey = new TablePartitionId(tableId,
replicationGroupId.partitionId());
+ this.metrics = metrics;
this.schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
@@ -771,7 +777,11 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
return completedFuture(rows);
} else {
return
validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
- .thenApply(ignored -> rows);
+ .thenApply(ignored -> {
+ metrics.onRead(rows.size(), false);
+
+ return rows;
+ });
}
})
.whenComplete((rows, err) -> {
@@ -840,7 +850,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());
- CompletableFuture<Void> safeReadFuture =
isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture()
+ CompletableFuture<Void> safeReadFuture =
isPrimaryInTimestamp(isPrimary, readTimestamp)
+ ? nullCompletedFuture()
: safeTime.waitFor(readTimestamp);
if (request.indexToUse() != null) {
@@ -853,18 +864,34 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
if (request.exactKey() != null) {
assert request.lowerBoundPrefix() == null &&
request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds.";
- return safeReadFuture.thenCompose(unused ->
lookupIndex(request, indexStorage));
+ return safeReadFuture
+ .thenCompose(unused -> lookupIndex(request,
indexStorage))
+ .thenApply(rows -> {
+ metrics.onRead(rows.size(), true);
+
+ return rows;
+ });
}
assert indexStorage.storage() instanceof SortedIndexStorage;
- return safeReadFuture.thenCompose(unused ->
scanSortedIndex(request, indexStorage));
+ return safeReadFuture
+ .thenCompose(unused -> scanSortedIndex(request,
indexStorage))
+ .thenApply(rows -> {
+ metrics.onRead(rows.size(), true);
+
+ return rows;
+ });
}
return safeReadFuture
.thenCompose(
- unused -> retrieveExactEntriesUntilCursorEmpty(txId,
request.coordinatorId(), readTimestamp, cursorId, batchCount)
- );
+ unused -> retrieveExactEntriesUntilCursorEmpty(txId,
request.coordinatorId(), readTimestamp, cursorId, batchCount))
+ .thenApply(rows -> {
+ metrics.onRead(rows.size(), true);
+
+ return rows;
+ });
}
/**
@@ -1054,11 +1081,13 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
HybridTimestamp readTimestamp = request.readTimestamp();
if (request.requestType() != RO_GET) {
- throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ throw new IgniteInternalException(
+ Replicator.REPLICA_COMMON_ERR,
format("Unknown single request [actionType={}]",
request.requestType()));
}
- CompletableFuture<Void> safeReadFuture =
isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture()
+ CompletableFuture<Void> safeReadFuture =
isPrimaryInTimestamp(isPrimary, readTimestamp)
+ ? nullCompletedFuture()
: safeTime.waitFor(request.readTimestamp());
return safeReadFuture.thenCompose(unused ->
resolveRowByPkForReadOnly(primaryKey, readTimestamp));
@@ -1090,11 +1119,13 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
HybridTimestamp readTimestamp = request.readTimestamp();
if (request.requestType() != RO_GET_ALL) {
- throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ throw new IgniteInternalException(
+ Replicator.REPLICA_COMMON_ERR,
format("Unknown single request [actionType={}]",
request.requestType()));
}
- CompletableFuture<Void> safeReadFuture =
isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture()
+ CompletableFuture<Void> safeReadFuture =
isPrimaryInTimestamp(isPrimary, readTimestamp)
+ ? nullCompletedFuture()
: safeTime.waitFor(request.readTimestamp());
return safeReadFuture.thenCompose(unused -> {
@@ -1931,10 +1962,14 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
// Nothing found in the storage, return null.
if (writeIntents.isEmpty() && regularEntries.isEmpty()) {
+ metrics.onRead(true);
+
return nullCompletedFuture();
}
if (writeIntents.isEmpty()) {
+ metrics.onRead(true);
+
// No write intents, then return the committed value. We
already know that regularEntries is not empty.
return completedFuture(regularEntries.get(0).binaryRow());
} else {
@@ -1948,6 +1983,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
resolveWriteIntentReadability(writeIntent, ts)
.thenApply(writeIntentReadable ->
inBusyLock(busyLock, () -> {
+ metrics.onRead(true);
+
if (writeIntentReadable) {
return findAny(writeIntents,
wi -> !wi.isEmpty()).map(ReadResult::binaryRow).orElse(null);
} else {
@@ -2104,6 +2141,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
}
if (rowIdsToDelete.isEmpty()) {
+ metrics.onRead(searchRows.size(), false);
+
return completedFuture(new ReplicaResult(result,
null));
}
@@ -2117,7 +2156,12 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
leaseStartTime
)
)
- .thenApply(res -> new ReplicaResult(result, res));
+ .thenApply(res -> {
+ metrics.onRead(searchRows.size(), false);
+ metrics.onWrite(rowIdsToDelete.size());
+
+ return new ReplicaResult(result, res);
+ });
});
}
case RW_INSERT_ALL: {
@@ -2153,6 +2197,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
}
if (rowsToInsert.isEmpty()) {
+ metrics.onRead(searchRows.size(), false);
+
return completedFuture(new ReplicaResult(result,
null));
}
@@ -2185,6 +2231,9 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
)
)
.thenApply(res -> {
+ metrics.onRead(searchRows.size(), false);
+ metrics.onWrite(rowsToInsert.size());
+
// Release short term locks.
for (CompletableFuture<IgniteBiTuple<RowId,
Collection<Lock>>> insertLockFut : insertLockFuts) {
insertLockFut.join().get2()
@@ -2222,11 +2271,14 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
}
}
+ int uniqueKeysCount = 0;
for (int i = 0; i < searchRows.size(); i++) {
if (rowIdFuts[i] != null) {
continue; // Skip previous row with the same key.
}
+ uniqueKeysCount++;
+
BinaryRow searchRow = searchRows.get(i);
boolean isDelete = deleted != null && deleted.get(i);
@@ -2256,6 +2308,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
});
}
+ int uniqueKeysCountFinal = uniqueKeysCount;
+
return allOf(rowIdFuts).thenCompose(ignore -> {
Map<UUID, TimedBinaryRowMessage> rowsToUpdate =
IgniteUtils.newHashMap(searchRows.size());
List<RowId> rows = new ArrayList<>();
@@ -2282,6 +2336,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
}
if (rowsToUpdate.isEmpty()) {
+ metrics.onRead(uniqueKeysCountFinal, false);
+
return completedFuture(new ReplicaResult(null, null));
}
@@ -2296,6 +2352,9 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
)
)
.thenApply(res -> {
+ metrics.onRead(uniqueKeysCountFinal, false);
+ metrics.onWrite(uniqueKeysCountFinal);
+
// Release short term locks.
for (CompletableFuture<IgniteBiTuple<RowId,
Collection<Lock>>> rowIdFut : rowIdFuts) {
IgniteBiTuple<RowId, Collection<Lock>>
futRes = rowIdFut.join();
@@ -2356,11 +2415,17 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
}
if (allElementsAreNull(result)) {
+ metrics.onRead(result.size(), false);
+
return completedFuture(new
ReplicaResult(result, null));
}
return
validateRwReadAgainstSchemaAfterTakingLocks(txId)
- .thenApply(unused -> new
ReplicaResult(result, null));
+ .thenApply(unused -> {
+ metrics.onRead(result.size(), false);
+
+ return new ReplicaResult(result, null);
+ });
});
}
case RW_DELETE_ALL: {
@@ -2422,7 +2487,11 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
leaseStartTime
)
)
- .thenApply(res -> new ReplicaResult(result, res));
+ .thenApply(res -> {
+ metrics.onWrite(rowIdsToDelete.size());
+
+ return new ReplicaResult(result, res);
+ });
});
}
default: {
@@ -2732,7 +2801,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
HybridTimestamp readTimestamp = opStartTimestamp;
if (request.requestType() != RO_GET) {
- throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ throw new IgniteInternalException(
+ Replicator.REPLICA_COMMON_ERR,
format("Unknown single request [actionType={}]",
request.requestType()));
}
@@ -2757,12 +2827,16 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
case RW_DELETE_EXACT: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
+ metrics.onRead(false);
+
return completedFuture(new ReplicaResult(false, null));
}
return takeLocksForDeleteExact(searchRow, rowId, row, txId)
.thenCompose(validatedRowId -> {
if (validatedRowId == null) {
+ metrics.onRead(false);
+
return completedFuture(new
ReplicaResult(false, null));
}
@@ -2778,13 +2852,20 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
leaseStartTime
)
)
- .thenApply(res -> new
ReplicaResult(true, res));
+ .thenApply(res -> {
+ metrics.onRead(false);
+ metrics.onWrite();
+
+ return new ReplicaResult(true,
res);
+ });
});
});
}
case RW_INSERT: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
if (rowId != null) {
+ metrics.onRead(false);
+
return completedFuture(new ReplicaResult(false, null));
}
@@ -2804,6 +2885,9 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock)))
.thenApply(tuple -> {
+ metrics.onRead(false);
+ metrics.onWrite();
+
// Release short term locks.
tuple.get2().get2().forEach(lock ->
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
@@ -2836,6 +2920,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock)))
.thenApply(tuple -> {
+ metrics.onWrite();
+
// Release short term locks.
tuple.get2().get2().forEach(lock ->
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
@@ -2868,6 +2954,9 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock)))
.thenApply(tuple -> {
+ metrics.onRead(false);
+ metrics.onWrite();
+
// Release short term locks.
tuple.get2().get2().forEach(lock ->
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
@@ -2878,6 +2967,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
case RW_GET_AND_REPLACE: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
+ metrics.onRead(false);
+
return completedFuture(new ReplicaResult(null, null));
}
@@ -2896,6 +2987,9 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock)))
.thenApply(tuple -> {
+ metrics.onRead(false);
+ metrics.onWrite();
+
// Release short term locks.
tuple.get2().get2().forEach(lock ->
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
@@ -2906,6 +3000,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
case RW_REPLACE_IF_EXIST: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
+ metrics.onRead(false);
+
return completedFuture(new ReplicaResult(false, null));
}
@@ -2924,6 +3020,9 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock)))
.thenApply(tuple -> {
+ metrics.onRead(false);
+ metrics.onWrite();
+
// Release short term locks.
tuple.get2().get2().forEach(lock ->
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
@@ -2957,12 +3056,18 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
case RW_GET: {
return resolveRowByPk(primaryKey, txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
+ metrics.onRead(false);
+
return nullCompletedFuture();
}
return takeLocksForGet(rowId, txId)
.thenCompose(ignored ->
validateRwReadAgainstSchemaAfterTakingLocks(txId))
- .thenApply(ignored -> new ReplicaResult(row,
null));
+ .thenApply(ignored -> {
+ metrics.onRead(false);
+
+ return new ReplicaResult(row, null);
+ });
});
}
case RW_DELETE: {
@@ -2988,12 +3093,18 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
leaseStartTime
)
)
- .thenApply(res -> new ReplicaResult(true, res));
+ .thenApply(res -> {
+ metrics.onWrite();
+
+ return new ReplicaResult(true, res);
+ });
});
}
case RW_GET_AND_DELETE: {
return resolveRowByPk(primaryKey, txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
+ metrics.onRead(false);
+
return nullCompletedFuture();
}
@@ -3014,7 +3125,12 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
leaseStartTime
)
)
- .thenApply(res -> new ReplicaResult(row, res));
+ .thenApply(res -> {
+ metrics.onRead(false);
+ metrics.onWrite();
+
+ return new ReplicaResult(row, res);
+ });
});
}
default: {
@@ -3229,12 +3345,16 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
if (request.requestType() == RW_REPLACE) {
return resolveRowByPk(extractPk(newRow), txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
+ metrics.onRead(false);
+
return completedFuture(new ReplicaResult(false, null));
}
return takeLocksForReplace(expectedRow, row, newRow, rowId,
txId)
.thenCompose(rowIdLock -> {
if (rowIdLock == null) {
+ metrics.onRead(false);
+
return completedFuture(new
ReplicaResult(false, null));
}
@@ -3256,6 +3376,9 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock))
.thenApply(tuple -> {
+ metrics.onRead(false);
+ metrics.onWrite();
+
// Release short term locks.
tuple.get2().get2()
.forEach(lock ->
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 9fa5cb52364..8f418d59651 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -119,6 +119,7 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import
org.apache.ignite.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionIds;
@@ -211,6 +212,8 @@ public class InternalTableImpl implements InternalTable {
private final boolean colocationEnabled;
+ private final TableMetricSource metrics;
+
/**
* Constructor.
*
@@ -249,7 +252,8 @@ public class InternalTableImpl implements InternalTable {
StreamerReceiverRunner streamerReceiverRunner,
Supplier<Long> defaultRwTxTimeout,
Supplier<Long> defaultReadTxTimeout,
- boolean colocationEnabled
+ boolean colocationEnabled,
+ TableMetricSource metrics
) {
this.tableName = tableName;
this.zoneId = zoneId;
@@ -269,6 +273,7 @@ public class InternalTableImpl implements InternalTable {
this.defaultRwTxTimeout = defaultRwTxTimeout;
this.defaultReadTxTimeout = defaultReadTxTimeout;
this.colocationEnabled = colocationEnabled;
+ this.metrics = metrics;
}
/** {@inheritDoc} */
@@ -2404,4 +2409,9 @@ public class InternalTableImpl implements InternalTable {
Boolean full
);
}
+
+ @Override
+ public TableMetricSource metrics() {
+ return metrics;
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
new file mode 100644
index 00000000000..0348a4f9b79
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.metrics;
+
+import java.util.List;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.LongAdderMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.table.metrics.TableMetricSource.Holder;
+import org.apache.ignite.table.QualifiedName;
+
+/**
+ * Set of metrics related to a specific table.
+ *
+ * <p>
+ * <b>Metrics affected by key-value and record view operations:</b>
+ * <table border="1">
+ * <caption>Methods and affected metrics</caption>
+ * <tr>
+ * <th>Method(s)</th>
+ * <th>RoReads</th>
+ * <th>RwReads</th>
+ * <th>Writes</th>
+ * </tr>
+ * <tr>
+ * <td>get, getOrDefault, contains</td>
+ * <td>Yes, for read-only transactions</td>
+ * <td>Yes, for read-write transactions</td>
+ * <td>No</td>
+ * </tr>
+ * <tr>
+ * <td>getAll, containsAll</td>
+ * <td>Yes, it is incremented by the number of keys read for read-only
transactions</td>
+ * <td>Yes, it is incremented by the number of keys read for read-write
transactions</td>
+ * <td>No</td>
+ * </tr>
+ * <tr>
+ * <td>put, upsert</td>
+ * <td>No</td>
+ * <td>No</td>
+ * <td>Yes</td>
+ * </tr>
+ * <tr>
+ * <td>putAll, upsertAll</td>
+ * <td>No</td>
+ * <td>No</td>
+ * <td>Yes, it is incremented by the number of keys inserted</td>
+ * </tr>
+ * <tr>
+ * <td>putIfAbsent, insert</td>
+ * <td>No</td>
+ * <td>Yes</td>
+ * <td>Yes, if the method returns true</td>
+ * </tr>
+ * <tr>
+ * <td>insertAll</td>
+ * <td>No</td>
+ * <td>Yes, it is incremented by the number of keys read, which is equal
to the number of keys provided</td>
+ * <td>Yes, it is incremented by the number of keys inserted</td>
+ * </tr>
+ * <tr>
+ * <td>getAndPut, replace, getAndReplace</td>
+ * <td>No</td>
+ * <td>Yes</td>
+ * <td>Yes, if the value is inserted / replaced</td>
+ * </tr>
+ * <tr>
+ * <td>remove, delete</td>
+ * <td>No</td>
+ * <td>No</td>
+ * <td>Yes, if the method returns true</td>
+ * </tr>
+ * <tr>
+ * <td>removeAll, getAndRemove, deleteAll</td>
+ * <td>No</td>
+ * <td>No</td>
+ * <td>Yes, it is incremented by the number of keys removed</td>
+ * </tr>
+ * <tr>
+ * <td>conditional remove, deleteExact, deleteAllExact</td>
+ * <td>No</td>
+ * <td>Yes, it is incremented by the number of keys read, which is equal
to the number of keys provided</td>
+ * <td>Yes, it is incremented by the number of keys removed</td>
+ * </tr>
+ * <tr>
+ * <td>getAndRemove</td>
+ * <td>No</td>
+ * <td>Yes</td>
+ * <td>Yes, if the value is removed</td>
+ * </tr>
+ * </table>
+ *
+ * <i>Note: Only synchronous methods are listed. Asynchronous methods affect
the same metrics.</i>
+ */
+public class TableMetricSource extends AbstractMetricSource<Holder> {
+ /** Source name. */
+ public static final String SOURCE_NAME = "tables";
+
+ /** Metric names. */
+ public static final String RO_READS = "RoReads";
+ public static final String RW_READS = "RwReads";
+ public static final String WRITES = "Writes";
+
+ private final QualifiedName tableName;
+
+ /**
+ * Creates a new instance of {@link TableMetricSource}.
+ *
+ * @param tableName Qualified table name.
+ */
+ public TableMetricSource(QualifiedName tableName) {
+ super(SOURCE_NAME + '.' + tableName.toCanonicalForm(), "Table
metrics.", "tables");
+ this.tableName = tableName;
+ }
+
+ /**
+ * Returns the qualified name of the table.
+ *
+ * @return Qualified name of the table.
+ */
+ public QualifiedName qualifiedTableName() {
+ return tableName;
+ }
+
+ /**
+ * Increments a counter of reads.
+ *
+ * @param readOnly {@code true} if read operation is executed within
read-only transaction, and {@code false} otherwise.
+ */
+ public void onRead(boolean readOnly) {
+ Holder holder = holder();
+
+ if (holder != null) {
+ if (readOnly) {
+ holder.roReads.increment();
+ } else {
+ holder.rwReads.increment();
+ }
+ }
+ }
+
+ /**
+ * Adds the given {@code x} to a counter of reads.
+ *
+ * @param readOnly {@code true} if read operation is executed within
read-only transaction, and {@code false} otherwise.
+ */
+ public void onRead(int x, boolean readOnly) {
+ Holder holder = holder();
+
+ if (holder != null) {
+ if (readOnly) {
+ holder.roReads.add(x);
+ } else {
+ holder.rwReads.add(x);
+ }
+ }
+ }
+
+ /**
+ * Increments a counter of writes.
+ */
+ public void onWrite() {
+ Holder holder = holder();
+
+ if (holder != null) {
+ holder.writes.increment();
+ }
+ }
+
+ /**
+ * Adds the given {@code x} to a counter of writes.
+ */
+ public void onWrite(int x) {
+ Holder holder = holder();
+
+ if (holder != null) {
+ holder.writes.add(x);
+ }
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder();
+ }
+
+ /** Actual metrics holder. */
+ protected static class Holder implements
AbstractMetricSource.Holder<Holder> {
+ private final LongAdderMetric roReads = new LongAdderMetric(
+ RO_READS,
+ "The total number of reads executed within read-write
transactions.");
+
+ private final LongAdderMetric rwReads = new LongAdderMetric(
+ RW_READS,
+ "The total number of reads executed within read-only
transactions.");
+
+ private final LongAdderMetric writes = new LongAdderMetric(
+ WRITES,
+ "The total number of writes executed within read-write
transactions.");
+
+ private final List<Metric> metrics = List.of(roReads, rwReads, writes);
+
+ @Override
+ public Iterable<Metric> metrics() {
+ return metrics;
+ }
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 64895bf0194..ea9a719d6ab 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -101,6 +101,7 @@ import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.InternalClusterNodeImpl;
@@ -498,7 +499,8 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
partitionReplicaLifecycleManager,
new SystemPropertiesNodeProperties(),
minTimeCollectorService,
- systemDistributedConfiguration
+ systemDistributedConfiguration,
+ new NoOpMetricManager()
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 416cf5aaa70..c02f6a47f49 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -106,6 +106,7 @@ import org.apache.ignite.internal.metastorage.Revisions;
import
org.apache.ignite.internal.metastorage.impl.MetaStorageRevisionListenerRegistry;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.InternalClusterNodeImpl;
@@ -923,7 +924,8 @@ public class TableManagerTest extends IgniteAbstractTest {
partitionReplicaLifecycleManager,
new SystemPropertiesNodeProperties(),
new MinimumRequiredTimeCollectorServiceImpl(),
- systemDistributedConfiguration
+ systemDistributedConfiguration,
+ new NoOpMetricManager()
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 8992f808425..56407da3710 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -114,6 +114,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateR
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockManager;
@@ -130,6 +131,7 @@ import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.table.QualifiedName;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.BeforeAll;
@@ -291,7 +293,8 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
mock(IndexMetaStorage.class),
new TestLowWatermark(),
new NoOpFailureManager(),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new TableMetricSource(QualifiedName.fromSimple("test_table"))
);
kvMarshaller = new
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class,
Integer.class);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
index 1511e38c555..767cec133eb 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
@@ -109,6 +109,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateR
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockManager;
@@ -125,6 +126,7 @@ import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.table.QualifiedName;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.BeforeAll;
@@ -260,7 +262,8 @@ public class PartitionReplicaListenerSortedIndexLockingTest
extends IgniteAbstra
mock(IndexMetaStorage.class),
new TestLowWatermark(),
new NoOpFailureManager(),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new TableMetricSource(QualifiedName.fromSimple("test_table"))
);
kvMarshaller = new
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class,
Integer.class);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 1cb08db2bfe..30cddf6ba4e 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -227,6 +227,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.StaleTransactionO
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -263,6 +264,7 @@ import
org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.tx.TransactionException;
import org.hamcrest.Matcher;
import org.jetbrains.annotations.Nullable;
@@ -692,7 +694,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
indexMetaStorage,
lowWatermark,
new NoOpFailureManager(),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new TableMetricSource(QualifiedName.fromSimple("test_table"))
);
kvMarshaller = marshallerFor(schemaDescriptor);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index 6e4596ffd7c..097fe41f2b3 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -192,6 +192,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -226,6 +227,7 @@ import
org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -661,7 +663,8 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
indexMetaStorage,
lowWatermark,
failureManager,
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new TableMetricSource(QualifiedName.fromSimple("test_table"))
);
kvMarshaller = marshallerFor(schemaDescriptor);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index a1482277953..631795540dc 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -100,6 +100,7 @@ import
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
@@ -112,6 +113,7 @@ import
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -235,7 +237,8 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- colocationEnabled()
+ colocationEnabled(),
+ new TableMetricSource(QualifiedName.fromSimple(TABLE_NAME))
);
when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class));
@@ -344,7 +347,8 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
indexMetaStorage,
new TestLowWatermark(),
new NoOpFailureManager(),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new TableMetricSource(QualifiedName.fromSimple("test_table"))
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 72620208e3c..082e723189f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
import org.apache.ignite.internal.testframework.WithSystemProperty;
@@ -99,6 +100,7 @@ import
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -247,7 +249,8 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- colocationEnabled()
+ colocationEnabled(),
+ new TableMetricSource(QualifiedName.fromSimple("test"))
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/metrics/TableMetricSourceTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/metrics/TableMetricSourceTest.java
new file mode 100644
index 00000000000..7e942f12da0
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/metrics/TableMetricSourceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.metrics;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.table.QualifiedName;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests metric source name and table metric names.
+ * If you want to change the name, or add a new metric, please don't forget to
update the corresponding documentation.
+ */
+public class TableMetricSourceTest {
+ private static final String TABLE_NAME = "test_table";
+
+ @Test
+ void testMetricSourceName() {
+ QualifiedName qualifiedTableName =
QualifiedName.fromSimple(TABLE_NAME);
+
+ var metricSource = new TableMetricSource(qualifiedTableName);
+
+ assertThat(TableMetricSource.SOURCE_NAME, is("tables"));
+ assertThat(metricSource.name(), is("tables." +
qualifiedTableName.toCanonicalForm()));
+ }
+
+ @Test
+ void testMetricNames() {
+ var metricSource = new
TableMetricSource(QualifiedName.fromSimple(TABLE_NAME));
+
+ MetricSet set = metricSource.enable();
+
+ assertThat(set, is(notNullValue()));
+
+ Set<String> expectedMetrics = Set.of(
+ "RwReads",
+ "RoReads",
+ "Writes");
+
+ var actualMetrics = new HashSet<String>();
+ set.forEach(m -> actualMetrics.add(m.name()));
+
+ assertThat(actualMetrics, is(expectedMetrics));
+ }
+}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 6ea44e27a65..e7082817154 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -168,6 +168,7 @@ import
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxStateMeta;
@@ -684,7 +685,8 @@ public class ItTxTestCluster {
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- colocationEnabled()
+ colocationEnabled(),
+ new TableMetricSource(QualifiedName.fromSimple(tableName))
);
TableImpl table = new TableImpl(
@@ -1121,7 +1123,8 @@ public class ItTxTestCluster {
mock(IndexMetaStorage.class),
lowWatermark,
new NoOpFailureManager(),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new TableMetricSource(QualifiedName.fromSimple("test_table"))
);
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 0cf1f02cfc1..9188f9dc976 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -128,6 +128,7 @@ import
org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
@@ -143,6 +144,7 @@ import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
@@ -308,7 +310,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- colocationEnabled()
+ colocationEnabled(),
+ new TableMetricSource(QualifiedName.fromSimple("test"))
);
RaftGroupService svc = mock(RaftGroupService.class);
@@ -494,7 +497,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
mock(IndexMetaStorage.class),
new TestLowWatermark(),
mock(FailureProcessor.class),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new TableMetricSource(QualifiedName.fromSimple("dummy_table"))
);
if (enabledColocation) {