This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 31f5dbe3faf IGNITE-26067 Partition modification counter (#6563)
31f5dbe3faf is described below
commit 31f5dbe3faf5dcaa9e8c9641a397d400e7e9e725
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Mon Sep 15 12:33:26 2025 +0300
IGNITE-26067 Partition modification counter (#6563)
---
.../rebalance/ItRebalanceDistributedTest.java | 3 +-
.../partition/replicator/fixtures/Node.java | 4 +-
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 7 +-
.../ItPartitionModificationCounterMetricsTest.java | 369 +++++++++++++++++++++
.../distributed/PartitionModificationCounter.java | 120 +++++++
.../PartitionModificationCounterFactory.java | 52 +++
.../PartitionModificationCounterMetricSource.java | 81 +++++
.../table/distributed/StorageUpdateHandler.java | 24 +-
.../internal/table/distributed/TableManager.java | 55 ++-
.../internal/table/distributed/IndexBaseTest.java | 4 +-
.../PartitionModificationCounterTest.java | 135 ++++++++
.../table/distributed/StorageCleanupTest.java | 4 +-
.../distributed/StorageUpdateHandlerTest.java | 4 +-
.../distributed/TableManagerRecoveryTest.java | 3 +-
.../table/distributed/TableManagerTest.java | 3 +-
.../raft/PartitionCommandListenerTest.java | 7 +-
.../PartitionReplicaListenerIndexLockingTest.java | 4 +-
...itionReplicaListenerSortedIndexLockingTest.java | 4 +-
.../replication/PartitionReplicaListenerTest.java | 4 +-
.../ZonePartitionReplicaListenerTest.java | 4 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 4 +-
.../ignite/internal/table/TableTestUtils.java | 17 +
.../table/impl/DummyInternalTableImpl.java | 4 +-
24 files changed, 897 insertions(+), 23 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index d761b35bf3a..00eed3de3e0 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1649,7 +1649,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
nodeProperties,
minTimeCollectorService,
systemDistributedConfiguration,
- metricManager
+ metricManager,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
) {
@Override
protected TxStateStorage createTxStateTableStorage(
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 1a06ed18804..297ef1261a0 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -170,6 +170,7 @@ import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbSt
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -804,7 +805,8 @@ public class Node {
nodeProperties,
minTimeCollectorService,
systemDistributedConfiguration,
- metricManager
+ metricManager,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
) {
@Override
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 8e0bded6243..1e70034f487 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -212,6 +212,7 @@ import
org.apache.ignite.internal.storage.configurations.StorageExtensionConfigu
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.table.RecordBinaryViewImpl;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
@@ -812,7 +813,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
nodeProperties,
minTimeCollectorService,
systemDistributedConfiguration,
- metricManager
+ metricManager,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
);
var indexManager = new IndexManager(
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 4ec1efa464d..ce14609c05d 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -259,6 +259,7 @@ import
org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
+import
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory;
import
org.apache.ignite.internal.table.distributed.PublicApiThreadingIgniteTables;
import org.apache.ignite.internal.table.distributed.TableManager;
import
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
@@ -1110,6 +1111,9 @@ public class IgniteImpl implements Ignite {
metricManager
);
+ PartitionModificationCounterFactory
partitionModificationCounterFactory =
+ new PartitionModificationCounterFactory(clockService::current);
+
distributedTblMgr = new TableManager(
name,
registry,
@@ -1149,7 +1153,8 @@ public class IgniteImpl implements Ignite {
nodeProperties,
minTimeCollectorService,
systemDistributedConfiguration,
- metricManager
+ metricManager,
+ partitionModificationCounterFactory
);
disasterRecoveryManager = new DisasterRecoveryManager(
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
new file mode 100644
index 00000000000..6c387d0b0b5
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory.DEFAULT_MIN_STALE_ROWS_COUNT;
+import static
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_COUNTER;
+import static
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_LAST_MILESTONE_TIMESTAMP;
+import static
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_NEXT_MILESTONE;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import
org.apache.ignite.internal.table.distributed.PartitionModificationCounter;
+import
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.QualifiedName;
+import org.apache.ignite.tx.Transaction;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link PartitionModificationCounter partition modification
counter} metrics.
+ */
+public class ItPartitionModificationCounterMetricsTest extends
BaseSqlIntegrationTest {
+ private static final String ZONE_1_PART_NO_REPLICAS =
"zone_single_partition_no_replicas";
+ private static final String ZONE_1_PART_REPLICAS = "zone_single_partition";
+ private static final String ZONE_8_PART_NO_REPLICAS =
"zone_multi_partition";
+
+ private static final int UNDEFINED_METRIC_VALUE = -1;
+
+ @BeforeAll
+ void setupDistributionZones() {
+ sqlScript(
+ format("CREATE ZONE {} (PARTITIONS 1, REPLICAS {}) storage
profiles ['default'];", ZONE_1_PART_REPLICAS, initialNodes()),
+ format("CREATE ZONE {} (PARTITIONS 1, REPLICAS 1) storage
profiles ['default'];", ZONE_1_PART_NO_REPLICAS),
+ format("CREATE ZONE {} (PARTITIONS 8, REPLICAS 1) storage
profiles ['default'];", ZONE_8_PART_NO_REPLICAS)
+ );
+ }
+
+ @BeforeEach
+ void dropTables() {
+ dropAllTables();
+ }
+
+ /**
+ * Tests that counters are updated independently for tables in the same
zone.
+ */
+ @Test
+ void twoTablesInTheSameZone() {
+ String tab1 = "T1";
+ String tab2 = "T2";
+
+ sqlScript(
+ format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE
{};", tab1, ZONE_1_PART_NO_REPLICAS),
+ format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE
{};", tab2, ZONE_1_PART_NO_REPLICAS)
+ );
+
+ sql(format("INSERT INTO {} VALUES(0, 0), (1, 1);", tab1));
+
+ expectModsCount(tab1, 2L);
+ expectNextMilestone(tab1, DEFAULT_MIN_STALE_ROWS_COUNT);
+
+ sql(format("INSERT INTO {} VALUES(0, 0), (1, 1), (2, 2);", tab2));
+
+ expectModsCount(tab2, 3L);
+ expectNextMilestone(tab2, DEFAULT_MIN_STALE_ROWS_COUNT);
+
+ expectModsCount(tab1, 2L);
+ }
+
+ /**
+ * Tests that dropping and creating a table with the same name resets the
counter value.
+ */
+ @Test
+ void recreateTableWithTheSameName() {
+ String table = "test_table";
+
+ sqlScript(
+ format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE
{};", table, ZONE_1_PART_NO_REPLICAS),
+ "INSERT INTO test_table VALUES(0, 0), (1, 1);"
+ );
+ expectModsCount(table, 2);
+
+ sqlScript(
+ format("DROP TABLE {}", table),
+ format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE
{};", table, ZONE_1_PART_NO_REPLICAS)
+ );
+ expectModsCount(table, 0);
+
+ sql("INSERT INTO test_table VALUES(0, 0), (1, 1);");
+ expectModsCount(table, 2);
+ }
+
+ /**
+ * Tests that different types of updates are counted.
+ */
+ @Test
+ void differentUpdateTypes() {
+ String tabName = "test_table";
+ sql(format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE {};",
tabName, ZONE_8_PART_NO_REPLICAS));
+ KeyValueView<Integer, Integer> keyValueView =
CLUSTER.aliveNode().tables().table("test_table")
+ .keyValueView(Integer.class, Integer.class);
+
+ int expectedMods = 0;
+
+ // Implicit transaction.
+ {
+ sql("INSERT INTO test_table VALUES(0, 0);");
+ expectModsCount(tabName, ++expectedMods);
+
+ sql("UPDATE test_table SET val=1 WHERE id=0");
+ expectModsCount(tabName, ++expectedMods);
+
+ keyValueView.put(null, 0, 2);
+ expectModsCount(tabName, ++expectedMods);
+
+ sql("INSERT INTO test_table VALUES(1, 1), (2, 2);");
+ expectedMods += 2;
+ expectModsCount(tabName, expectedMods);
+
+ keyValueView.putAll(null, Map.of(3, 3, 4, 4, 5, 5));
+ expectedMods += 3;
+ expectModsCount(tabName, expectedMods);
+
+ sql("UPDATE test_table SET val=20 WHERE val = 2");
+ expectedMods += 2;
+ expectModsCount(tabName, expectedMods);
+
+ sql("DELETE FROM test_table");
+ expectedMods += 6;
+ expectModsCount(tabName, expectedMods);
+ }
+
+ // Explicit transaction.
+ {
+ {
+ Transaction tx = CLUSTER.aliveNode().transactions().begin();
+ sql(tx, "INSERT INTO test_table VALUES(0, 0);");
+ expectModsCount(tabName, expectedMods);
+ tx.commit();
+ expectModsCount(tabName, ++expectedMods);
+ }
+
+ {
+ Transaction tx = CLUSTER.aliveNode().transactions().begin();
+
+ sql(tx, "UPDATE test_table SET val=1 WHERE id=0");
+ keyValueView.put(tx, 0, 2);
+ sql(tx, "INSERT INTO test_table VALUES(1, 1), (2, 2);");
+ keyValueView.putAll(tx, Map.of(3, 3, 4, 4, 5, 5));
+ expectModsCount(tabName, expectedMods);
+
+ tx.commit();
+ expectedMods += 6;
+ expectModsCount(tabName, expectedMods);
+ }
+
+ {
+ Transaction tx = CLUSTER.aliveNode().transactions().begin();
+
+ sql(tx, "UPDATE test_table SET val=20 WHERE val = 2");
+ sql(tx, "DELETE FROM test_table");
+ expectModsCount(tabName, expectedMods);
+
+ tx.commit();
+ expectedMods += 6;
+ expectModsCount(tabName, expectedMods);
+ }
+ }
+
+ for (int part = 0; part < 8; part++) {
+ assertThat(metricFromAnyNode(tabName, part,
METRIC_NEXT_MILESTONE), is(DEFAULT_MIN_STALE_ROWS_COUNT));
+ }
+ }
+
+ /**
+ * Tests that the milestone timestamp is updated only when
+ * the number of modifications reaches the configured threshold.
+ */
+ @Test
+ void reachMilestoneUpdateTest() {
+ String tableWithReplicas = "TEST_TABLE";
+ String tableNoReplicas = "TEST_TABLE_NO_REPLICAS";
+
+ int replicas = initialNodes();
+
+ sqlScript(
+ format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE
{};", tableWithReplicas, ZONE_1_PART_REPLICAS),
+ format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE
{};", tableNoReplicas, ZONE_1_PART_NO_REPLICAS),
+ format("INSERT INTO {} VALUES(0, 0);", tableWithReplicas),
+ format("INSERT INTO {} VALUES(0, 0);", tableNoReplicas)
+ );
+
+ expectModsCount(tableNoReplicas, 1);
+ expectModsCount(tableWithReplicas, replicas);
+
+ long initTsNoReplicas = metricFromAnyNode(tableNoReplicas, 0,
METRIC_LAST_MILESTONE_TIMESTAMP);
+ long initTsWithReplicas = metricFromAnyNode(tableWithReplicas, 0,
METRIC_LAST_MILESTONE_TIMESTAMP);
+
+ long modsCount = DEFAULT_MIN_STALE_ROWS_COUNT / 2;
+
+ // Perform a bunch of modifications.
+ {
+ for (int i = 0; i < modsCount; i++) {
+ sql(format("UPDATE {} SET VAL=?", tableWithReplicas), i);
+ sql(format("UPDATE {} SET VAL=?", tableNoReplicas), i);
+ }
+
+ long expectedModsCount = 1 + modsCount;
+
+ expectModsCount(tableNoReplicas, expectedModsCount);
+ expectModsCount(tableWithReplicas, expectedModsCount * replicas);
+
+ // Timestamp should not change as we did not reach the threshold.
+ assertThat(metricFromAnyNode(tableNoReplicas, 0,
METRIC_LAST_MILESTONE_TIMESTAMP), is(initTsNoReplicas));
+ assertThat(metricFromAnyNode(tableWithReplicas, 0,
METRIC_LAST_MILESTONE_TIMESTAMP), is(initTsWithReplicas));
+
+ assertThat(metricFromAnyNode(tableNoReplicas, 0,
METRIC_NEXT_MILESTONE), is(DEFAULT_MIN_STALE_ROWS_COUNT));
+ assertThat(metricFromAnyNode(tableWithReplicas, 0,
METRIC_NEXT_MILESTONE), is(DEFAULT_MIN_STALE_ROWS_COUNT));
+ }
+
+ // Perform another bunch of modifications to reach the milestone.
+ {
+ for (int i = 0; i < modsCount; i++) {
+ sql(format("UPDATE {} SET VAL=?", tableWithReplicas), i);
+ sql(format("UPDATE {} SET VAL=?", tableNoReplicas), i);
+ }
+
+ long expectedModsCount = 1 + modsCount + modsCount;
+
+ expectModsCount(tableNoReplicas, expectedModsCount);
+ expectModsCount(tableWithReplicas, expectedModsCount * replicas);
+
+ // Timestamp should change because we reached the threshold.
+ Awaitility.await().untilAsserted(() ->
+ assertThat(metricFromAnyNode(tableNoReplicas, 0,
METRIC_LAST_MILESTONE_TIMESTAMP), greaterThan(initTsNoReplicas))
+ );
+ Awaitility.await().untilAsserted(() ->
+ assertThat(metricFromAnyNode(tableWithReplicas, 0,
METRIC_LAST_MILESTONE_TIMESTAMP), greaterThan(initTsWithReplicas))
+ );
+
+ Awaitility.await().untilAsserted(() ->
+ assertThat(metricFromAnyNode(tableNoReplicas, 0,
METRIC_NEXT_MILESTONE), is(DEFAULT_MIN_STALE_ROWS_COUNT * 2))
+ );
+ Awaitility.await().untilAsserted(() ->
+ assertThat(metricFromAnyNode(tableWithReplicas, 0,
METRIC_NEXT_MILESTONE), is(DEFAULT_MIN_STALE_ROWS_COUNT * 2))
+ );
+ }
+ }
+
+ private void expectModsCount(String tableName, long value) {
+ expectLongValue(tableName, value, METRIC_COUNTER);
+ }
+
+ private void expectNextMilestone(String tableName, long value) {
+ expectLongValue(tableName, value, METRIC_NEXT_MILESTONE);
+ }
+
+ private void expectLongValue(String tableName, long value, String
metricName) {
+ QualifiedName qualifiedName = QualifiedName.parse(tableName);
+ CatalogManager manager = unwrapIgniteImpl(node(0)).catalogManager();
+ Catalog catalog = manager.catalog(manager.latestCatalogVersion());
+ CatalogTableDescriptor tableDesc =
catalog.table(qualifiedName.schemaName(), qualifiedName.objectName());
+ int partsCount = catalog.zone(tableDesc.zoneId()).partitions();
+
+ Awaitility.await().untilAsserted(() -> {
+ long summaryValue = 0;
+
+ for (int part = 0; part < partsCount; part++) {
+ int tableId = tableIdByName(QualifiedName.parse(tableName));
+
+ String metricSourceName =
+
PartitionModificationCounterMetricSource.formatSourceName(tableId, part);
+
+ boolean metricFound = false;
+
+ for (int i = 0; i < CLUSTER.nodes().size(); i++) {
+ long metricValue = metricFromNode(i, tableName, part,
metricName);
+
+ if (metricValue != UNDEFINED_METRIC_VALUE) {
+ metricFound = true;
+
+ summaryValue += metricValue;
+ }
+ }
+
+ if (!metricFound) {
+ throw new IllegalArgumentException("Metrics not found " +
metricSourceName);
+ }
+ }
+
+ assertThat(summaryValue, is(value));
+ });
+ }
+
+ private int tableIdByName(QualifiedName qualifiedName) {
+ CatalogManager manager = unwrapIgniteImpl(node(0)).catalogManager();
+ Catalog catalog = manager.catalog(manager.latestCatalogVersion());
+ CatalogTableDescriptor tableDesc =
catalog.table(qualifiedName.schemaName(), qualifiedName.objectName());
+
+ assert tableDesc != null;
+
+ return tableDesc.id();
+ }
+
+ private long metricFromAnyNode(String tableName, int partId, String
metricName) {
+ for (int i = 0; i < CLUSTER.nodes().size(); i++) {
+ long value = metricFromNode(i, tableName, partId, metricName);
+
+ if (value != UNDEFINED_METRIC_VALUE) {
+ return value;
+ }
+ }
+
+ return UNDEFINED_METRIC_VALUE;
+ }
+
+ private long metricFromNode(int nodeIdx, String tableName, int partId,
String metricName) {
+ int tableId = tableIdByName(QualifiedName.parse(tableName));
+
+ String metricSourceName =
+
PartitionModificationCounterMetricSource.formatSourceName(tableId, partId);
+
+ MetricManager metricManager =
unwrapIgniteImpl(node(nodeIdx)).metricManager();
+
+ MetricSet metrics =
metricManager.metricSnapshot().metrics().get(metricSourceName);
+
+ if (metrics != null) {
+ LongMetric metric = metrics.get(metricName);
+ Objects.requireNonNull(metric, "metric does not exist: " +
metricName);
+
+ return metric.value();
+ }
+
+ return UNDEFINED_METRIC_VALUE;
+ }
+
+ private static void sqlScript(String ... queries) {
+ CLUSTER.aliveNode().sql().executeScript(String.join(";", queries));
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
new file mode 100644
index 00000000000..0c4311a6f63
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * Keeps track of the number of modifications of a partition.
+ *
+ * <p>When the configured threshold value of the number of modifications is
reached, a timestamp corresponding
+ * to the commit time of the transaction that made this update is stored in
{@link #lastMilestoneReachedTimestamp}.
+ *
+ * <p>The timestamp value is used to determine the staleness of related SQL
statistics.
+ */
+public class PartitionModificationCounter {
+ private final LongSupplier partitionSizeSupplier;
+ private final double staleRowsFraction;
+ private final long minStaleRowsCount;
+
+ private final AtomicLong counter = new AtomicLong(0);
+ private volatile long nextMilestone;
+ private volatile HybridTimestamp lastMilestoneReachedTimestamp;
+
+ /** Constructor. */
+ public PartitionModificationCounter(
+ HybridTimestamp initTimestamp,
+ LongSupplier partitionSizeSupplier,
+ double staleRowsFraction,
+ long minStaleRowsCount
+ ) {
+ Objects.requireNonNull(initTimestamp, "initTimestamp");
+ Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier");
+
+ if (staleRowsFraction < 0 || staleRowsFraction > 1) {
+ throw new IllegalArgumentException("staleRowsFraction must be in
[0, 1] range");
+ }
+
+ if (minStaleRowsCount < 0) {
+ throw new IllegalArgumentException("minStaleRowsCount must be
non-negative");
+ }
+
+ this.staleRowsFraction = staleRowsFraction;
+ this.minStaleRowsCount = minStaleRowsCount;
+ this.partitionSizeSupplier = partitionSizeSupplier;
+
+ nextMilestone =
computeNextMilestone(partitionSizeSupplier.getAsLong(), staleRowsFraction,
minStaleRowsCount);
+ lastMilestoneReachedTimestamp = initTimestamp;
+ }
+
+ /** Returns the current counter value. */
+ public long value() {
+ return counter.get();
+ }
+
+ /**
+ * Returns a timestamp representing the commit time of the
+ * last transaction that caused the counter to reach a milestone.
+ *
+ * @return Timestamp of last milestone reached.
+ */
+ public HybridTimestamp lastMilestoneTimestamp() {
+ return lastMilestoneReachedTimestamp;
+ }
+
+ /** Returns the value of the next milestone. */
+ public long nextMilestone() {
+ return nextMilestone;
+ }
+
+ /**
+ * Adds the given value to the current counter value.
+ *
+ * @param delta The value to add.
+ * @param commitTimestamp The commit timestamp of the transaction that
made the modification.
+ */
+ public void updateValue(int delta, HybridTimestamp commitTimestamp) {
+ Objects.requireNonNull(commitTimestamp, "commitTimestamp");
+
+ if (delta < 0) {
+ throw new IllegalArgumentException("Delta must be non-negative.");
+ }
+
+ if (delta == 0) {
+ return;
+ }
+
+ long newCounter = counter.addAndGet(delta);
+
+ if (newCounter >= nextMilestone) {
+ this.nextMilestone = newCounter +
computeNextMilestone(partitionSizeSupplier.getAsLong(), staleRowsFraction,
minStaleRowsCount);
+ this.lastMilestoneReachedTimestamp = commitTimestamp;
+ }
+ }
+
+ private static long computeNextMilestone(
+ long currentSize,
+ double staleRowsFraction,
+ long minStaleRowsCount
+ ) {
+ return Math.max((long) (currentSize * staleRowsFraction),
minStaleRowsCount);
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
new file mode 100644
index 00000000000..496a24077f6
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * Factory for producing {@link PartitionModificationCounter}.
+ */
+public class PartitionModificationCounterFactory {
+ public static final long DEFAULT_MIN_STALE_ROWS_COUNT = 500L;
+
+ public static final double DEFAULT_STALE_ROWS_FRACTION = 0.2d;
+
+ private final Supplier<HybridTimestamp> currentTimestampSupplier;
+
+ public PartitionModificationCounterFactory(Supplier<HybridTimestamp>
currentTimestampSupplier) {
+ this.currentTimestampSupplier = currentTimestampSupplier;
+ }
+
+ /**
+ * Creates a new partition modification counter.
+ *
+ * @param partitionSizeSupplier Partition size supplier.
+ * @return New partition modification counter.
+ */
+ public PartitionModificationCounter create(LongSupplier
partitionSizeSupplier) {
+ return new PartitionModificationCounter(
+ currentTimestampSupplier.get(),
+ partitionSizeSupplier,
+ DEFAULT_STALE_ROWS_FRACTION,
+ DEFAULT_MIN_STALE_ROWS_COUNT
+ );
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
new file mode 100644
index 00000000000..0d77f00d2f8
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.metrics.MetricSource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Metrics related to {@link PartitionModificationCounter partition
modification counter}.
+ */
+public class PartitionModificationCounterMetricSource implements MetricSource {
+ public static final String METRIC_COUNTER = "modificationCount";
+ public static final String METRIC_NEXT_MILESTONE = "nextMilestone";
+ public static final String METRIC_LAST_MILESTONE_TIMESTAMP =
"lastMilestoneTimestamp";
+
+ private final Map<String, Metric> metrics = new HashMap<>();
+ private final String metricSourceName;
+
+ private boolean enabled;
+
+ public PartitionModificationCounterMetricSource(int tableId, int
partitionId) {
+ this.metricSourceName = formatSourceName(tableId, partitionId);
+ }
+
+ @Override
+ public String name() {
+ return metricSourceName;
+ }
+
+ @Override
+ public @Nullable MetricSet enable() {
+ if (enabled) {
+ return null;
+ }
+
+ enabled = true;
+
+ return new MetricSet(metricSourceName, Map.copyOf(metrics));
+ }
+
+ @Override
+ public void disable() {
+ enabled = false;
+ }
+
+ @Override
+ public boolean enabled() {
+ return enabled;
+ }
+
+ /** Adds a metric to the source. */
+ public void addMetric(Metric metric) {
+ assert !enabled : "Metrics can be added only before enabling the
metric source";
+
+ metrics.put(metric.name(), metric);
+ }
+
+ public static String formatSourceName(int tableId, int partitionId) {
+ return
IgniteStringFormatter.format("partition.statistics.table.{}.partition.{}",
tableId, partitionId);
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index fe6fed202e0..9a8dcf91e96 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -66,6 +66,9 @@ public class StorageUpdateHandler {
/** Replication configuration. */
private final ReplicationConfiguration replicationConfiguration;
+ /** Partition modification counter. */
+ private final PartitionModificationCounter modificationCounter;
+
/**
* The constructor.
*
@@ -73,17 +76,20 @@ public class StorageUpdateHandler {
* @param storage Partition data storage.
* @param indexUpdateHandler Partition index update handler.
* @param replicationConfiguration Configuration for the replication.
+ * @param modificationCounter Partition modification counter.
*/
public StorageUpdateHandler(
int partitionId,
PartitionDataStorage storage,
IndexUpdateHandler indexUpdateHandler,
- ReplicationConfiguration replicationConfiguration
+ ReplicationConfiguration replicationConfiguration,
+ PartitionModificationCounter modificationCounter
) {
this.partitionId = partitionId;
this.storage = storage;
this.indexUpdateHandler = indexUpdateHandler;
this.replicationConfiguration = replicationConfiguration;
+ this.modificationCounter = modificationCounter;
}
/** Returns partition ID of the storage. */
@@ -132,7 +138,11 @@ public class StorageUpdateHandler {
if (trackWriteIntent) {
pendingRows.addPendingRowId(txId, rowId);
- }
+ } else
+ // TODO https://issues.apache.org/jira/browse/IGNITE-26411 No
need to check commiTs for null
+ if (commitTs != null) {
+ modificationCounter.updateValue(1, commitTs);
+ }
if (onApplication != null) {
onApplication.run();
@@ -263,7 +273,11 @@ public class StorageUpdateHandler {
if (trackWriteIntent) {
pendingRows.addPendingRowIds(txId, processedRowIds);
- }
+ } else
+ // TODO https://issues.apache.org/jira/browse/IGNITE-26411 No
need to check commiTs for null
+ if (commitTs != null) {
+ modificationCounter.updateValue(processedRowIds.size(),
commitTs);
+ }
if (entryToProcess == null && onApplication != null) {
onApplication.run();
@@ -377,6 +391,10 @@ public class StorageUpdateHandler {
assert commitTimestamp != null : "Commit timestamp is null: " + txId;
pendingRowIds.forEach(rowId -> storage.commitWrite(rowId,
commitTimestamp, txId));
+
+ if (!pendingRowIds.isEmpty()) {
+ modificationCounter.updateValue(pendingRowIds.size(),
commitTimestamp);
+ }
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3126ff450ba..f8a9d5fdaa3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -97,6 +97,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.IntStream;
@@ -147,6 +148,7 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metrics.LongGauge;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
@@ -467,6 +469,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final ReliableCatalogVersions reliableCatalogVersions;
private final MetricManager metricManager;
+ private final PartitionModificationCounterFactory
partitionModificationCounterFactory;
+ private final Map<TablePartitionId,
PartitionModificationCounterMetricSource> partModCounterMetricSources = new
ConcurrentHashMap<>();
/**
* Creates a new table manager.
@@ -539,7 +543,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
NodeProperties nodeProperties,
MinimumRequiredTimeCollectorService minTimeCollectorService,
SystemDistributedConfiguration systemDistributedConfiguration,
- MetricManager metricManager
+ MetricManager metricManager,
+ PartitionModificationCounterFactory
partitionModificationCounterFactory
) {
this.topologyService = topologyService;
this.replicaMgr = replicaMgr;
@@ -570,6 +575,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.nodeProperties = nodeProperties;
this.minTimeCollectorService = minTimeCollectorService;
this.metricManager = metricManager;
+ this.partitionModificationCounterFactory =
partitionModificationCounterFactory;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
this.executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -3051,6 +3057,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
minTimeCollectorService.removePartition(tablePartitionId);
+ PartitionModificationCounterMetricSource metricSource =
partModCounterMetricSources.remove(tablePartitionId);
+ if (metricSource != null) {
+ metricManager.unregisterSource(metricSource);
+ }
+
return mvGc.removeStorage(tablePartitionId);
});
}
@@ -3132,7 +3143,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return topologyService.localMember();
}
- private static PartitionUpdateHandlers createPartitionUpdateHandlers(
+ private PartitionUpdateHandlers createPartitionUpdateHandlers(
int partitionId,
PartitionDataStorage partitionDataStorage,
TableViewInternal table,
@@ -3145,16 +3156,54 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
GcUpdateHandler gcUpdateHandler = new
GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
+ LongSupplier partSizeSupplier = () ->
partitionDataStorage.getStorage().estimatedSize();
+ PartitionModificationCounter modificationCounter =
partitionModificationCounterFactory.create(partSizeSupplier);
+ registerPartitionModificationCounterMetrics(table.tableId(),
partitionId, modificationCounter);
+
StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
partitionId,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ modificationCounter
);
return new PartitionUpdateHandlers(storageUpdateHandler,
indexUpdateHandler, gcUpdateHandler);
}
+ private void registerPartitionModificationCounterMetrics(
+ int tableId, int partitionId, PartitionModificationCounter
counter) {
+
+ PartitionModificationCounterMetricSource metricSource =
+ new PartitionModificationCounterMetricSource(tableId,
partitionId);
+
+ metricSource.addMetric(new LongGauge(
+ PartitionModificationCounterMetricSource.METRIC_COUNTER,
+ "The value of the volatile counter of partition modifications.
"
+ + "This value is used to determine staleness of the
related SQL statistics.",
+ counter::value
+ ));
+
+ metricSource.addMetric(new LongGauge(
+ PartitionModificationCounterMetricSource.METRIC_NEXT_MILESTONE,
+ "The value of the next milestone for the number of partition
modifications. "
+ + "This value is used to determine staleness of the
related SQL statistics.",
+ counter::nextMilestone
+ ));
+
+ metricSource.addMetric(new LongGauge(
+
PartitionModificationCounterMetricSource.METRIC_LAST_MILESTONE_TIMESTAMP,
+ "The timestamp value representing the commit time of the last
modification operation that "
+ + "reached the milestone. This value is used to
determine staleness of the related SQL statistics.",
+ () -> counter.lastMilestoneTimestamp().longValue()
+ ));
+
+ metricManager.registerSource(metricSource);
+ metricManager.enable(metricSource);
+
+ partModCounterMetricSources.put(new TablePartitionId(tableId,
partitionId), metricSource);
+ }
+
/**
* Returns a cached table instance if it exists, {@code null} otherwise.
Can return a table that is being stopped.
*
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index 255140ce324..438bb3fe804 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
@@ -180,7 +181,8 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
PARTITION_ID,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
);
TestStorageUtils.completeBuiltIndexes(storage, hashInnerStorage,
sortedInnerStorage);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
new file mode 100644
index 00000000000..40c0e31ef85
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for class {@link PartitionModificationCounter}.
+ */
+public class PartitionModificationCounterTest extends BaseIgniteAbstractTest {
+ private final PartitionModificationCounterFactory factory =
+ new PartitionModificationCounterFactory(() ->
HybridTimestamp.hybridTimestamp(1L));
+
+ @Test
+ void initialValues() {
+ // Empty table.
+ {
+ PartitionModificationCounter counter = factory.create(() -> 0L);
+
+ assertThat(counter.value(), is(0L));
+ assertThat(counter.nextMilestone(),
is(PartitionModificationCounterFactory.DEFAULT_MIN_STALE_ROWS_COUNT));
+ assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L));
+ }
+
+ // Table with 10k rows.
+ {
+ PartitionModificationCounter counter = factory.create(() ->
10_000L);
+
+ assertThat(counter.value(), is(0L));
+ assertThat(counter.nextMilestone(), is(2000L));
+ assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L));
+
+ // A zero update should not change the counter values.
+ counter.updateValue(0, HybridTimestamp.MAX_VALUE);
+
+ assertThat(counter.value(), is(0L));
+ assertThat(counter.nextMilestone(), is(2000L));
+ assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L));
+ }
+ }
+
+ @Test
+ void lastMilestoneTimestampUpdate() {
+ int rowsCount = 10_000;
+ int threshold = (int) (rowsCount *
PartitionModificationCounterFactory.DEFAULT_STALE_ROWS_FRACTION);
+ PartitionModificationCounter counter = factory.create(() -> rowsCount);
+
+ assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L));
+
+ {
+ HybridTimestamp commitTime = HybridTimestamp.hybridTimestamp(100L);
+
+ counter.updateValue(threshold, commitTime);
+
+ assertThat(counter.value(), is(2_000L));
+ assertThat(counter.nextMilestone(), is(4_000L));
+ assertThat(counter.lastMilestoneTimestamp().longValue(),
is(commitTime.longValue()));
+ }
+
+ {
+ HybridTimestamp commitTime = HybridTimestamp.hybridTimestamp(200L);
+
+ counter.updateValue(threshold, commitTime);
+ assertThat(counter.value(), is(4_000L));
+ assertThat(counter.nextMilestone(), is(6_000L));
+ assertThat(counter.lastMilestoneTimestamp().longValue(),
is(commitTime.longValue()));
+ }
+ }
+
+ @Test
+ @SuppressWarnings({"ThrowableNotThrown",
"ResultOfObjectAllocationIgnored", "DataFlowIssue"})
+ void invalidUpdateValues() {
+ PartitionModificationCounter counter = factory.create(() -> 0L);
+
+ IgniteTestUtils.assertThrows(NullPointerException.class,
+ () -> counter.updateValue(1, null), "commitTimestamp");
+
+ IgniteTestUtils.assertThrows(
+ IllegalArgumentException.class,
+ () -> counter.updateValue(-1, HybridTimestamp.MIN_VALUE),
+ "Delta must be non-negative"
+ );
+
+ IgniteTestUtils.assertThrows(
+ NullPointerException.class,
+ () -> new PartitionModificationCounter(null, () -> 0L, 0.0d,
0),
+ "initTimestamp"
+ );
+
+ IgniteTestUtils.assertThrows(
+ NullPointerException.class,
+ () -> new
PartitionModificationCounter(HybridTimestamp.MIN_VALUE, null, 0.0d, 0),
+ "partitionSizeSupplier"
+ );
+
+ IgniteTestUtils.assertThrows(
+ IllegalArgumentException.class,
+ () -> new
PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0L, 1.1d, 0),
+ "staleRowsFraction must be in [0, 1] range"
+ );
+
+ IgniteTestUtils.assertThrows(
+ IllegalArgumentException.class,
+ () -> new
PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0L, -0.1d, 0),
+ "staleRowsFraction must be in [0, 1] range"
+ );
+
+ IgniteTestUtils.assertThrows(
+ IllegalArgumentException.class,
+ () -> new
PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0L, -0.1d, -1),
+ "staleRowsFraction must be in [0, 1] range"
+ );
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
index 8c798b37567..502f85ad9e4 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
@@ -58,6 +58,7 @@ import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.type.NativeTypes;
@@ -172,7 +173,8 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
PARTITION_ID,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
index 8190b8e61d1..ec24635ff6b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -52,6 +52,7 @@ import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.Sto
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.storage.util.LockByRowId;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.type.NativeTypes;
@@ -164,7 +165,8 @@ public class StorageUpdateHandlerTest extends
BaseMvStoragesTest {
PARTITION_ID,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 0c05751f230..c1246a7ddd7 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -500,7 +500,8 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
new SystemPropertiesNodeProperties(),
minTimeCollectorService,
systemDistributedConfiguration,
- new NoOpMetricManager()
+ new NoOpMetricManager(),
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 273d2bf1297..47fdd6720cf 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -925,7 +925,8 @@ public class TableManagerTest extends IgniteAbstractTest {
new SystemPropertiesNodeProperties(),
new MinimumRequiredTimeCollectorServiceImpl(),
systemDistributedConfiguration,
- new NoOpMetricManager()
+ new NoOpMetricManager(),
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 0df8f604b0e..12873a82feb 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -123,6 +123,7 @@ import
org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexMeta;
@@ -262,7 +263,8 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
PARTITION_ID,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
));
catalogService = mock(CatalogService.class);
@@ -522,7 +524,8 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
PARTITION_ID,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
);
LeasePlacementDriver placementDriver =
mock(LeasePlacementDriver.class);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 56407da3710..5ff851166ca 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -102,6 +102,7 @@ import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
@@ -280,7 +281,8 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
PART_ID,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
),
new DummyValidationSchemasSource(schemaManager),
localNode,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
index 767cec133eb..54acdd64e82 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
@@ -98,6 +98,7 @@ import
org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
@@ -249,7 +250,8 @@ public class PartitionReplicaListenerSortedIndexLockingTest
extends IgniteAbstra
PART_ID,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
),
new DummyValidationSchemasSource(schemaManager),
localNode,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 47f842b134f..a67e31e8b37 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -212,6 +212,7 @@ import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
@@ -681,7 +682,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
PART_ID,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
),
validationSchemasSource,
localNode,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index d5394fd45b5..9b011cc0e96 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -181,6 +181,7 @@ import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
@@ -650,7 +651,8 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
PART_ID,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
),
validationSchemasSource,
localNode,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index e7082817154..f1e481000b8 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -151,6 +151,7 @@ import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.Stora
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
@@ -759,7 +760,8 @@ public class ItTxTestCluster {
partId,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
);
DummySchemaManagerImpl schemaManager = new
DummySchemaManagerImpl(schemaDescriptor);
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index 387f4d4f9a6..988ab12a642 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -25,6 +25,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.List;
+import java.util.function.LongSupplier;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogService;
@@ -42,7 +43,10 @@ import
org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.sql.SqlCommon;
+import
org.apache.ignite.internal.table.distributed.PartitionModificationCounter;
+import
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory;
import org.apache.ignite.sql.ColumnType;
import org.jetbrains.annotations.Nullable;
@@ -60,6 +64,19 @@ public class TableTestUtils {
/** Column name. */
public static final String COLUMN_NAME = "TEST_COLUMN";
+ /** No-op partition modification counter. */
+ public static final PartitionModificationCounter
NOOP_PARTITION_MODIFICATION_COUNTER =
+ new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () ->
0, 0, 0);
+
+ /** No-op partition modification counter factory. */
+ public static PartitionModificationCounterFactory
NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY =
+ new PartitionModificationCounterFactory(() ->
HybridTimestamp.MIN_VALUE) {
+ @Override
+ public PartitionModificationCounter create(LongSupplier
partitionSizeSupplier) {
+ return NOOP_PARTITION_MODIFICATION_COUNTER;
+ }
+ };
+
/**
* Creates table in the catalog.
*
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 31d87ad173c..50ea759096e 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -116,6 +116,7 @@ import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
@@ -448,7 +449,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
PART_ID,
partitionDataStorage,
indexUpdateHandler,
- replicationConfiguration
+ replicationConfiguration,
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
);
DummySchemaManagerImpl schemaManager = new
DummySchemaManagerImpl(schema);