This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 31f5dbe3faf IGNITE-26067 Partition modification counter (#6563)
31f5dbe3faf is described below

commit 31f5dbe3faf5dcaa9e8c9641a397d400e7e9e725
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Mon Sep 15 12:33:26 2025 +0300

    IGNITE-26067 Partition modification counter (#6563)
---
 .../rebalance/ItRebalanceDistributedTest.java      |   3 +-
 .../partition/replicator/fixtures/Node.java        |   4 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   7 +-
 .../ItPartitionModificationCounterMetricsTest.java | 369 +++++++++++++++++++++
 .../distributed/PartitionModificationCounter.java  | 120 +++++++
 .../PartitionModificationCounterFactory.java       |  52 +++
 .../PartitionModificationCounterMetricSource.java  |  81 +++++
 .../table/distributed/StorageUpdateHandler.java    |  24 +-
 .../internal/table/distributed/TableManager.java   |  55 ++-
 .../internal/table/distributed/IndexBaseTest.java  |   4 +-
 .../PartitionModificationCounterTest.java          | 135 ++++++++
 .../table/distributed/StorageCleanupTest.java      |   4 +-
 .../distributed/StorageUpdateHandlerTest.java      |   4 +-
 .../distributed/TableManagerRecoveryTest.java      |   3 +-
 .../table/distributed/TableManagerTest.java        |   3 +-
 .../raft/PartitionCommandListenerTest.java         |   7 +-
 .../PartitionReplicaListenerIndexLockingTest.java  |   4 +-
 ...itionReplicaListenerSortedIndexLockingTest.java |   4 +-
 .../replication/PartitionReplicaListenerTest.java  |   4 +-
 .../ZonePartitionReplicaListenerTest.java          |   4 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |   4 +-
 .../ignite/internal/table/TableTestUtils.java      |  17 +
 .../table/impl/DummyInternalTableImpl.java         |   4 +-
 24 files changed, 897 insertions(+), 23 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index d761b35bf3a..00eed3de3e0 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1649,7 +1649,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     nodeProperties,
                     minTimeCollectorService,
                     systemDistributedConfiguration,
-                    metricManager
+                    metricManager,
+                    TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
             ) {
                 @Override
                 protected TxStateStorage createTxStateTableStorage(
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 1a06ed18804..297ef1261a0 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -170,6 +170,7 @@ import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbSt
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -804,7 +805,8 @@ public class Node {
                 nodeProperties,
                 minTimeCollectorService,
                 systemDistributedConfiguration,
-                metricManager
+                metricManager,
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
         ) {
 
             @Override
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 8e0bded6243..1e70034f487 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -212,6 +212,7 @@ import 
org.apache.ignite.internal.storage.configurations.StorageExtensionConfigu
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.table.RecordBinaryViewImpl;
 import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
@@ -812,7 +813,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 nodeProperties,
                 minTimeCollectorService,
                 systemDistributedConfiguration,
-                metricManager
+                metricManager,
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
         );
 
         var indexManager = new IndexManager(
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 4ec1efa464d..ce14609c05d 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -259,6 +259,7 @@ import 
org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
+import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory;
 import 
org.apache.ignite.internal.table.distributed.PublicApiThreadingIgniteTables;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
@@ -1110,6 +1111,9 @@ public class IgniteImpl implements Ignite {
                 metricManager
         );
 
+        PartitionModificationCounterFactory 
partitionModificationCounterFactory =
+                new PartitionModificationCounterFactory(clockService::current);
+
         distributedTblMgr = new TableManager(
                 name,
                 registry,
@@ -1149,7 +1153,8 @@ public class IgniteImpl implements Ignite {
                 nodeProperties,
                 minTimeCollectorService,
                 systemDistributedConfiguration,
-                metricManager
+                metricManager,
+                partitionModificationCounterFactory
         );
 
         disasterRecoveryManager = new DisasterRecoveryManager(
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
new file mode 100644
index 00000000000..6c387d0b0b5
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory.DEFAULT_MIN_STALE_ROWS_COUNT;
+import static 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_COUNTER;
+import static 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_LAST_MILESTONE_TIMESTAMP;
+import static 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_NEXT_MILESTONE;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounter;
+import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.QualifiedName;
+import org.apache.ignite.tx.Transaction;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link PartitionModificationCounter partition modification 
counter} metrics.
+ */
+public class ItPartitionModificationCounterMetricsTest extends 
BaseSqlIntegrationTest {
+    private static final String ZONE_1_PART_NO_REPLICAS = 
"zone_single_partition_no_replicas";
+    private static final String ZONE_1_PART_REPLICAS = "zone_single_partition";
+    private static final String ZONE_8_PART_NO_REPLICAS = 
"zone_multi_partition";
+
+    private static final int UNDEFINED_METRIC_VALUE = -1;
+
+    @BeforeAll
+    void setupDistributionZones() {
+        sqlScript(
+                format("CREATE ZONE {} (PARTITIONS 1, REPLICAS {}) storage 
profiles ['default'];", ZONE_1_PART_REPLICAS, initialNodes()),
+                format("CREATE ZONE {} (PARTITIONS 1, REPLICAS 1) storage 
profiles ['default'];", ZONE_1_PART_NO_REPLICAS),
+                format("CREATE ZONE {} (PARTITIONS 8, REPLICAS 1) storage 
profiles ['default'];", ZONE_8_PART_NO_REPLICAS)
+        );
+    }
+
+    @BeforeEach
+    void dropTables() {
+        dropAllTables();
+    }
+
+    /**
+     * Tests that counters are updated independently for tables in the same 
zone.
+     */
+    @Test
+    void twoTablesInTheSameZone() {
+        String tab1 = "T1";
+        String tab2 = "T2";
+
+        sqlScript(
+                format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE 
{};", tab1, ZONE_1_PART_NO_REPLICAS),
+                format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE 
{};", tab2, ZONE_1_PART_NO_REPLICAS)
+        );
+
+        sql(format("INSERT INTO {} VALUES(0, 0), (1, 1);", tab1));
+
+        expectModsCount(tab1, 2L);
+        expectNextMilestone(tab1, DEFAULT_MIN_STALE_ROWS_COUNT);
+
+        sql(format("INSERT INTO {} VALUES(0, 0), (1, 1), (2, 2);", tab2));
+
+        expectModsCount(tab2, 3L);
+        expectNextMilestone(tab2, DEFAULT_MIN_STALE_ROWS_COUNT);
+
+        expectModsCount(tab1, 2L);
+    }
+
+    /**
+     * Tests that dropping and creating a table with the same name resets the 
counter value.
+     */
+    @Test
+    void recreateTableWithTheSameName() {
+        String table = "test_table";
+
+        sqlScript(
+                format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE 
{};", table, ZONE_1_PART_NO_REPLICAS),
+                "INSERT INTO test_table VALUES(0, 0), (1, 1);"
+        );
+        expectModsCount(table, 2);
+
+        sqlScript(
+                format("DROP TABLE {}", table),
+                format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE 
{};", table, ZONE_1_PART_NO_REPLICAS)
+        );
+        expectModsCount(table, 0);
+
+        sql("INSERT INTO test_table VALUES(0, 0), (1, 1);");
+        expectModsCount(table, 2);
+    }
+
+    /**
+     * Tests that different types of updates are counted.
+     */
+    @Test
+    void differentUpdateTypes() {
+        String tabName = "test_table";
+        sql(format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE {};", 
tabName, ZONE_8_PART_NO_REPLICAS));
+        KeyValueView<Integer, Integer> keyValueView = 
CLUSTER.aliveNode().tables().table("test_table")
+                .keyValueView(Integer.class, Integer.class);
+
+        int expectedMods = 0;
+
+        // Implicit transaction.
+        {
+            sql("INSERT INTO test_table VALUES(0, 0);");
+            expectModsCount(tabName, ++expectedMods);
+
+            sql("UPDATE test_table SET val=1 WHERE id=0");
+            expectModsCount(tabName, ++expectedMods);
+
+            keyValueView.put(null, 0, 2);
+            expectModsCount(tabName, ++expectedMods);
+
+            sql("INSERT INTO test_table VALUES(1, 1), (2, 2);");
+            expectedMods += 2;
+            expectModsCount(tabName, expectedMods);
+
+            keyValueView.putAll(null, Map.of(3, 3, 4, 4, 5, 5));
+            expectedMods += 3;
+            expectModsCount(tabName, expectedMods);
+
+            sql("UPDATE test_table SET val=20 WHERE val = 2");
+            expectedMods += 2;
+            expectModsCount(tabName, expectedMods);
+
+            sql("DELETE FROM test_table");
+            expectedMods += 6;
+            expectModsCount(tabName, expectedMods);
+        }
+
+        // Explicit transaction.
+        {
+            {
+                Transaction tx = CLUSTER.aliveNode().transactions().begin();
+                sql(tx, "INSERT INTO test_table VALUES(0, 0);");
+                expectModsCount(tabName, expectedMods);
+                tx.commit();
+                expectModsCount(tabName, ++expectedMods);
+            }
+
+            {
+                Transaction tx = CLUSTER.aliveNode().transactions().begin();
+
+                sql(tx, "UPDATE test_table SET val=1 WHERE id=0");
+                keyValueView.put(tx, 0, 2);
+                sql(tx, "INSERT INTO test_table VALUES(1, 1), (2, 2);");
+                keyValueView.putAll(tx, Map.of(3, 3, 4, 4, 5, 5));
+                expectModsCount(tabName, expectedMods);
+
+                tx.commit();
+                expectedMods += 6;
+                expectModsCount(tabName, expectedMods);
+            }
+
+            {
+                Transaction tx = CLUSTER.aliveNode().transactions().begin();
+
+                sql(tx, "UPDATE test_table SET val=20 WHERE val = 2");
+                sql(tx, "DELETE FROM test_table");
+                expectModsCount(tabName, expectedMods);
+
+                tx.commit();
+                expectedMods += 6;
+                expectModsCount(tabName, expectedMods);
+            }
+        }
+
+        for (int part = 0; part < 8; part++) {
+            assertThat(metricFromAnyNode(tabName, part, 
METRIC_NEXT_MILESTONE), is(DEFAULT_MIN_STALE_ROWS_COUNT));
+        }
+    }
+
+    /**
+     * Tests that the milestone timestamp is updated only when
+     * the number of modifications reaches the configured threshold.
+     */
+    @Test
+    void reachMilestoneUpdateTest() {
+        String tableWithReplicas = "TEST_TABLE";
+        String tableNoReplicas = "TEST_TABLE_NO_REPLICAS";
+
+        int replicas = initialNodes();
+
+        sqlScript(
+                format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE 
{};", tableWithReplicas, ZONE_1_PART_REPLICAS),
+                format("CREATE TABLE {}(id INT PRIMARY KEY, val INT) ZONE 
{};", tableNoReplicas, ZONE_1_PART_NO_REPLICAS),
+                format("INSERT INTO {} VALUES(0, 0);", tableWithReplicas),
+                format("INSERT INTO {} VALUES(0, 0);", tableNoReplicas)
+        );
+
+        expectModsCount(tableNoReplicas, 1);
+        expectModsCount(tableWithReplicas, replicas);
+
+        long initTsNoReplicas = metricFromAnyNode(tableNoReplicas, 0, 
METRIC_LAST_MILESTONE_TIMESTAMP);
+        long initTsWithReplicas = metricFromAnyNode(tableWithReplicas, 0, 
METRIC_LAST_MILESTONE_TIMESTAMP);
+
+        long modsCount = DEFAULT_MIN_STALE_ROWS_COUNT / 2;
+
+        // Perform a bunch of modifications.
+        {
+            for (int i = 0; i < modsCount; i++) {
+                sql(format("UPDATE  {} SET VAL=?", tableWithReplicas), i);
+                sql(format("UPDATE  {} SET VAL=?", tableNoReplicas), i);
+            }
+
+            long expectedModsCount = 1 + modsCount;
+
+            expectModsCount(tableNoReplicas, expectedModsCount);
+            expectModsCount(tableWithReplicas, expectedModsCount * replicas);
+
+            // Timestamp should not change as we did not reach the threshold.
+            assertThat(metricFromAnyNode(tableNoReplicas, 0, 
METRIC_LAST_MILESTONE_TIMESTAMP), is(initTsNoReplicas));
+            assertThat(metricFromAnyNode(tableWithReplicas, 0, 
METRIC_LAST_MILESTONE_TIMESTAMP), is(initTsWithReplicas));
+
+            assertThat(metricFromAnyNode(tableNoReplicas, 0, 
METRIC_NEXT_MILESTONE), is(DEFAULT_MIN_STALE_ROWS_COUNT));
+            assertThat(metricFromAnyNode(tableWithReplicas, 0, 
METRIC_NEXT_MILESTONE), is(DEFAULT_MIN_STALE_ROWS_COUNT));
+        }
+
+        // Perform another bunch of modifications to reach the milestone.
+        {
+            for (int i = 0; i < modsCount; i++) {
+                sql(format("UPDATE  {} SET VAL=?", tableWithReplicas), i);
+                sql(format("UPDATE  {} SET VAL=?", tableNoReplicas), i);
+            }
+
+            long expectedModsCount = 1 + modsCount + modsCount;
+
+            expectModsCount(tableNoReplicas, expectedModsCount);
+            expectModsCount(tableWithReplicas, expectedModsCount * replicas);
+
+            // Timestamp should change because we reached the threshold.
+            Awaitility.await().untilAsserted(() ->
+                    assertThat(metricFromAnyNode(tableNoReplicas, 0, 
METRIC_LAST_MILESTONE_TIMESTAMP), greaterThan(initTsNoReplicas))
+            );
+            Awaitility.await().untilAsserted(() ->
+                    assertThat(metricFromAnyNode(tableWithReplicas, 0, 
METRIC_LAST_MILESTONE_TIMESTAMP), greaterThan(initTsWithReplicas))
+            );
+
+            Awaitility.await().untilAsserted(() ->
+                    assertThat(metricFromAnyNode(tableNoReplicas, 0, 
METRIC_NEXT_MILESTONE), is(DEFAULT_MIN_STALE_ROWS_COUNT * 2))
+            );
+            Awaitility.await().untilAsserted(() ->
+                    assertThat(metricFromAnyNode(tableWithReplicas, 0, 
METRIC_NEXT_MILESTONE), is(DEFAULT_MIN_STALE_ROWS_COUNT * 2))
+            );
+        }
+    }
+
+    private void expectModsCount(String tableName, long value) {
+        expectLongValue(tableName, value, METRIC_COUNTER);
+    }
+
+    private void expectNextMilestone(String tableName, long value) {
+        expectLongValue(tableName, value, METRIC_NEXT_MILESTONE);
+    }
+
+    private void expectLongValue(String tableName, long value, String 
metricName) {
+        QualifiedName qualifiedName = QualifiedName.parse(tableName);
+        CatalogManager manager = unwrapIgniteImpl(node(0)).catalogManager();
+        Catalog catalog = manager.catalog(manager.latestCatalogVersion());
+        CatalogTableDescriptor tableDesc = 
catalog.table(qualifiedName.schemaName(), qualifiedName.objectName());
+        int partsCount = catalog.zone(tableDesc.zoneId()).partitions();
+
+        Awaitility.await().untilAsserted(() -> {
+            long summaryValue = 0;
+
+            for (int part = 0; part < partsCount; part++) {
+                int tableId = tableIdByName(QualifiedName.parse(tableName));
+
+                String metricSourceName =
+                        
PartitionModificationCounterMetricSource.formatSourceName(tableId, part);
+
+                boolean metricFound = false;
+
+                for (int i = 0; i < CLUSTER.nodes().size(); i++) {
+                    long metricValue = metricFromNode(i, tableName, part, 
metricName);
+
+                    if (metricValue != UNDEFINED_METRIC_VALUE) {
+                        metricFound = true;
+
+                        summaryValue += metricValue;
+                    }
+                }
+
+                if (!metricFound) {
+                    throw new IllegalArgumentException("Metrics not found " + 
metricSourceName);
+                }
+            }
+
+            assertThat(summaryValue, is(value));
+        });
+    }
+
+    private int tableIdByName(QualifiedName qualifiedName) {
+        CatalogManager manager = unwrapIgniteImpl(node(0)).catalogManager();
+        Catalog catalog = manager.catalog(manager.latestCatalogVersion());
+        CatalogTableDescriptor tableDesc = 
catalog.table(qualifiedName.schemaName(), qualifiedName.objectName());
+
+        assert tableDesc != null;
+
+        return tableDesc.id();
+    }
+
+    private long metricFromAnyNode(String tableName, int partId, String 
metricName) {
+        for (int i = 0; i < CLUSTER.nodes().size(); i++) {
+            long value = metricFromNode(i, tableName, partId, metricName);
+
+            if (value != UNDEFINED_METRIC_VALUE) {
+                return value;
+            }
+        }
+
+        return UNDEFINED_METRIC_VALUE;
+    }
+
+    private long metricFromNode(int nodeIdx, String tableName, int partId, 
String metricName) {
+        int tableId = tableIdByName(QualifiedName.parse(tableName));
+
+        String metricSourceName =
+                
PartitionModificationCounterMetricSource.formatSourceName(tableId, partId);
+
+        MetricManager metricManager = 
unwrapIgniteImpl(node(nodeIdx)).metricManager();
+
+        MetricSet metrics = 
metricManager.metricSnapshot().metrics().get(metricSourceName);
+
+        if (metrics != null) {
+            LongMetric metric = metrics.get(metricName);
+            Objects.requireNonNull(metric, "metric does not exist: " + 
metricName);
+
+            return metric.value();
+        }
+
+        return UNDEFINED_METRIC_VALUE;
+    }
+
+    private static void sqlScript(String ... queries) {
+        CLUSTER.aliveNode().sql().executeScript(String.join(";", queries));
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
new file mode 100644
index 00000000000..0c4311a6f63
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * Keeps track of the number of modifications of a partition.
+ *
+ * <p>When the configured threshold value of the number of modifications is 
reached, a timestamp corresponding
+ * to the commit time of the transaction that made this update is stored in 
{@link #lastMilestoneReachedTimestamp}.
+ *
+ * <p>The timestamp value is used to determine the staleness of related SQL 
statistics.
+ */
+public class PartitionModificationCounter {
+    private final LongSupplier partitionSizeSupplier;
+    private final double staleRowsFraction;
+    private final long minStaleRowsCount;
+
+    private final AtomicLong counter = new AtomicLong(0);
+    private volatile long nextMilestone;
+    private volatile HybridTimestamp lastMilestoneReachedTimestamp;
+
+    /** Constructor. */
+    public PartitionModificationCounter(
+            HybridTimestamp initTimestamp,
+            LongSupplier partitionSizeSupplier,
+            double staleRowsFraction,
+            long minStaleRowsCount
+    ) {
+        Objects.requireNonNull(initTimestamp, "initTimestamp");
+        Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier");
+
+        if (staleRowsFraction < 0 || staleRowsFraction > 1) {
+            throw new IllegalArgumentException("staleRowsFraction must be in 
[0, 1] range");
+        }
+
+        if (minStaleRowsCount < 0) {
+            throw new IllegalArgumentException("minStaleRowsCount must be 
non-negative");
+        }
+
+        this.staleRowsFraction = staleRowsFraction;
+        this.minStaleRowsCount = minStaleRowsCount;
+        this.partitionSizeSupplier = partitionSizeSupplier;
+
+        nextMilestone = 
computeNextMilestone(partitionSizeSupplier.getAsLong(), staleRowsFraction, 
minStaleRowsCount);
+        lastMilestoneReachedTimestamp = initTimestamp;
+    }
+
+    /** Returns the current counter value. */
+    public long value() {
+        return counter.get();
+    }
+
+    /**
+     * Returns a timestamp representing the commit time of the
+     * last transaction that caused the counter to reach a milestone.
+     *
+     * @return Timestamp of last milestone reached.
+     */
+    public HybridTimestamp lastMilestoneTimestamp() {
+        return lastMilestoneReachedTimestamp;
+    }
+
+    /** Returns the value of the next milestone. */
+    public long nextMilestone() {
+        return nextMilestone;
+    }
+
+    /**
+     * Adds the given value to the current counter value.
+     *
+     * @param delta The value to add.
+     * @param commitTimestamp The commit timestamp of the transaction that 
made the modification.
+     */
+    public void updateValue(int delta, HybridTimestamp commitTimestamp) {
+        Objects.requireNonNull(commitTimestamp, "commitTimestamp");
+
+        if (delta < 0) {
+            throw new IllegalArgumentException("Delta must be non-negative.");
+        }
+
+        if (delta == 0) {
+            return;
+        }
+
+        long newCounter = counter.addAndGet(delta);
+
+        if (newCounter >= nextMilestone) {
+            this.nextMilestone = newCounter + 
computeNextMilestone(partitionSizeSupplier.getAsLong(), staleRowsFraction, 
minStaleRowsCount);
+            this.lastMilestoneReachedTimestamp = commitTimestamp;
+        }
+    }
+
+    private static long computeNextMilestone(
+            long currentSize,
+            double staleRowsFraction,
+            long minStaleRowsCount
+    ) {
+        return Math.max((long) (currentSize * staleRowsFraction), 
minStaleRowsCount);
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
new file mode 100644
index 00000000000..496a24077f6
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * Factory for producing {@link PartitionModificationCounter}.
+ */
+public class PartitionModificationCounterFactory {
+    public static final long DEFAULT_MIN_STALE_ROWS_COUNT = 500L;
+
+    public static final double DEFAULT_STALE_ROWS_FRACTION = 0.2d;
+
+    private final Supplier<HybridTimestamp> currentTimestampSupplier;
+
+    public PartitionModificationCounterFactory(Supplier<HybridTimestamp> 
currentTimestampSupplier) {
+        this.currentTimestampSupplier = currentTimestampSupplier;
+    }
+
+    /**
+     * Creates a new partition modification counter.
+     *
+     * @param partitionSizeSupplier Partition size supplier.
+     * @return New partition modification counter.
+     */
+    public PartitionModificationCounter create(LongSupplier 
partitionSizeSupplier) {
+        return new PartitionModificationCounter(
+                currentTimestampSupplier.get(),
+                partitionSizeSupplier,
+                DEFAULT_STALE_ROWS_FRACTION,
+                DEFAULT_MIN_STALE_ROWS_COUNT
+        );
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
new file mode 100644
index 00000000000..0d77f00d2f8
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.metrics.MetricSource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Metrics related to {@link PartitionModificationCounter partition 
modification counter}.
+ */
+public class PartitionModificationCounterMetricSource implements MetricSource {
+    public static final String METRIC_COUNTER = "modificationCount";
+    public static final String METRIC_NEXT_MILESTONE = "nextMilestone";
+    public static final String METRIC_LAST_MILESTONE_TIMESTAMP = 
"lastMilestoneTimestamp";
+
+    private final Map<String, Metric> metrics = new HashMap<>();
+    private final String metricSourceName;
+
+    private boolean enabled;
+
+    public PartitionModificationCounterMetricSource(int tableId, int 
partitionId) {
+        this.metricSourceName = formatSourceName(tableId, partitionId);
+    }
+
+    @Override
+    public String name() {
+        return metricSourceName;
+    }
+
+    @Override
+    public @Nullable MetricSet enable() {
+        if (enabled) {
+            return null;
+        }
+
+        enabled = true;
+
+        return new MetricSet(metricSourceName, Map.copyOf(metrics));
+    }
+
+    @Override
+    public void disable() {
+        enabled = false;
+    }
+
+    @Override
+    public boolean enabled() {
+        return enabled;
+    }
+
+    /** Adds a metric to the source. */
+    public void addMetric(Metric metric) {
+        assert !enabled : "Metrics can be added only before enabling the 
metric source";
+
+        metrics.put(metric.name(), metric);
+    }
+
+    public static String formatSourceName(int tableId, int partitionId) {
+        return 
IgniteStringFormatter.format("partition.statistics.table.{}.partition.{}", 
tableId, partitionId);
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index fe6fed202e0..9a8dcf91e96 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -66,6 +66,9 @@ public class StorageUpdateHandler {
     /** Replication configuration. */
     private final ReplicationConfiguration replicationConfiguration;
 
+    /** Partition modification counter. */
+    private final PartitionModificationCounter modificationCounter;
+
     /**
      * The constructor.
      *
@@ -73,17 +76,20 @@ public class StorageUpdateHandler {
      * @param storage Partition data storage.
      * @param indexUpdateHandler Partition index update handler.
      * @param replicationConfiguration Configuration for the replication.
+     * @param modificationCounter Partition modification counter.
      */
     public StorageUpdateHandler(
             int partitionId,
             PartitionDataStorage storage,
             IndexUpdateHandler indexUpdateHandler,
-            ReplicationConfiguration replicationConfiguration
+            ReplicationConfiguration replicationConfiguration,
+            PartitionModificationCounter modificationCounter
     ) {
         this.partitionId = partitionId;
         this.storage = storage;
         this.indexUpdateHandler = indexUpdateHandler;
         this.replicationConfiguration = replicationConfiguration;
+        this.modificationCounter = modificationCounter;
     }
 
     /** Returns partition ID of the storage. */
@@ -132,7 +138,11 @@ public class StorageUpdateHandler {
 
             if (trackWriteIntent) {
                 pendingRows.addPendingRowId(txId, rowId);
-            }
+            } else
+                // TODO https://issues.apache.org/jira/browse/IGNITE-26411 No 
need to check commiTs for null
+                if (commitTs != null) {
+                    modificationCounter.updateValue(1, commitTs);
+                }
 
             if (onApplication != null) {
                 onApplication.run();
@@ -263,7 +273,11 @@ public class StorageUpdateHandler {
 
             if (trackWriteIntent) {
                 pendingRows.addPendingRowIds(txId, processedRowIds);
-            }
+            } else
+                // TODO https://issues.apache.org/jira/browse/IGNITE-26411 No 
need to check commiTs for null
+                if (commitTs != null) {
+                    modificationCounter.updateValue(processedRowIds.size(), 
commitTs);
+                }
 
             if (entryToProcess == null && onApplication != null) {
                 onApplication.run();
@@ -377,6 +391,10 @@ public class StorageUpdateHandler {
         assert commitTimestamp != null : "Commit timestamp is null: " + txId;
 
         pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, 
commitTimestamp, txId));
+
+        if (!pendingRowIds.isEmpty()) {
+            modificationCounter.updateValue(pendingRowIds.size(), 
commitTimestamp);
+        }
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3126ff450ba..f8a9d5fdaa3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -97,6 +97,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.LongSupplier;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
@@ -147,6 +148,7 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metrics.LongGauge;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
@@ -467,6 +469,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     private final ReliableCatalogVersions reliableCatalogVersions;
 
     private final MetricManager metricManager;
+    private final PartitionModificationCounterFactory 
partitionModificationCounterFactory;
+    private final Map<TablePartitionId, 
PartitionModificationCounterMetricSource> partModCounterMetricSources = new 
ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
@@ -539,7 +543,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             NodeProperties nodeProperties,
             MinimumRequiredTimeCollectorService minTimeCollectorService,
             SystemDistributedConfiguration systemDistributedConfiguration,
-            MetricManager metricManager
+            MetricManager metricManager,
+            PartitionModificationCounterFactory 
partitionModificationCounterFactory
     ) {
         this.topologyService = topologyService;
         this.replicaMgr = replicaMgr;
@@ -570,6 +575,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.nodeProperties = nodeProperties;
         this.minTimeCollectorService = minTimeCollectorService;
         this.metricManager = metricManager;
+        this.partitionModificationCounterFactory = 
partitionModificationCounterFactory;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
         this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -3051,6 +3057,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
                     minTimeCollectorService.removePartition(tablePartitionId);
 
+                    PartitionModificationCounterMetricSource metricSource = 
partModCounterMetricSources.remove(tablePartitionId);
+                    if (metricSource != null) {
+                        metricManager.unregisterSource(metricSource);
+                    }
+
                     return mvGc.removeStorage(tablePartitionId);
                 });
     }
@@ -3132,7 +3143,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return topologyService.localMember();
     }
 
-    private static PartitionUpdateHandlers createPartitionUpdateHandlers(
+    private PartitionUpdateHandlers createPartitionUpdateHandlers(
             int partitionId,
             PartitionDataStorage partitionDataStorage,
             TableViewInternal table,
@@ -3145,16 +3156,54 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         GcUpdateHandler gcUpdateHandler = new 
GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
 
+        LongSupplier partSizeSupplier = () -> 
partitionDataStorage.getStorage().estimatedSize();
+        PartitionModificationCounter modificationCounter = 
partitionModificationCounterFactory.create(partSizeSupplier);
+        registerPartitionModificationCounterMetrics(table.tableId(), 
partitionId, modificationCounter);
+
         StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
                 partitionId,
                 partitionDataStorage,
                 indexUpdateHandler,
-                replicationConfiguration
+                replicationConfiguration,
+                modificationCounter
         );
 
         return new PartitionUpdateHandlers(storageUpdateHandler, 
indexUpdateHandler, gcUpdateHandler);
     }
 
+    private void registerPartitionModificationCounterMetrics(
+            int tableId, int partitionId, PartitionModificationCounter 
counter) {
+
+        PartitionModificationCounterMetricSource metricSource =
+                new PartitionModificationCounterMetricSource(tableId, 
partitionId);
+
+        metricSource.addMetric(new LongGauge(
+                PartitionModificationCounterMetricSource.METRIC_COUNTER,
+                "The value of the volatile counter of partition modifications. 
"
+                        + "This value is used to determine staleness of the 
related SQL statistics.",
+                counter::value
+        ));
+
+        metricSource.addMetric(new LongGauge(
+                PartitionModificationCounterMetricSource.METRIC_NEXT_MILESTONE,
+                "The value of the next milestone for the number of partition 
modifications. "
+                        + "This value is used to determine staleness of the 
related SQL statistics.",
+                counter::nextMilestone
+        ));
+
+        metricSource.addMetric(new LongGauge(
+                
PartitionModificationCounterMetricSource.METRIC_LAST_MILESTONE_TIMESTAMP,
+                "The timestamp value representing the commit time of the last 
modification operation that "
+                        + "reached the milestone. This value is used to 
determine staleness of the related SQL statistics.",
+                () -> counter.lastMilestoneTimestamp().longValue()
+        ));
+
+        metricManager.registerSource(metricSource);
+        metricManager.enable(metricSource);
+
+        partModCounterMetricSources.put(new TablePartitionId(tableId, 
partitionId), metricSource);
+    }
+
     /**
      * Returns a cached table instance if it exists, {@code null} otherwise. 
Can return a table that is being stopped.
      *
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index 255140ce324..438bb3fe804 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
@@ -180,7 +181,8 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
                 PARTITION_ID,
                 partitionDataStorage,
                 indexUpdateHandler,
-                replicationConfiguration
+                replicationConfiguration,
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
         );
 
         TestStorageUtils.completeBuiltIndexes(storage, hashInnerStorage, 
sortedInnerStorage);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
new file mode 100644
index 00000000000..40c0e31ef85
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for class {@link PartitionModificationCounter}.
+ */
+public class PartitionModificationCounterTest extends BaseIgniteAbstractTest {
+    private final PartitionModificationCounterFactory factory =
+            new PartitionModificationCounterFactory(() -> 
HybridTimestamp.hybridTimestamp(1L));
+
+    @Test
+    void initialValues() {
+        // Empty table.
+        {
+            PartitionModificationCounter counter = factory.create(() -> 0L);
+
+            assertThat(counter.value(), is(0L));
+            assertThat(counter.nextMilestone(), 
is(PartitionModificationCounterFactory.DEFAULT_MIN_STALE_ROWS_COUNT));
+            assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L));
+        }
+
+        // Table with 10k rows.
+        {
+            PartitionModificationCounter counter = factory.create(() -> 
10_000L);
+
+            assertThat(counter.value(), is(0L));
+            assertThat(counter.nextMilestone(), is(2000L));
+            assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L));
+
+            // A zero update should not change the counter values.
+            counter.updateValue(0, HybridTimestamp.MAX_VALUE);
+
+            assertThat(counter.value(), is(0L));
+            assertThat(counter.nextMilestone(), is(2000L));
+            assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L));
+        }
+    }
+
+    @Test
+    void lastMilestoneTimestampUpdate() {
+        int rowsCount = 10_000;
+        int threshold = (int) (rowsCount * 
PartitionModificationCounterFactory.DEFAULT_STALE_ROWS_FRACTION);
+        PartitionModificationCounter counter = factory.create(() -> rowsCount);
+
+        assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L));
+
+        {
+            HybridTimestamp commitTime = HybridTimestamp.hybridTimestamp(100L);
+
+            counter.updateValue(threshold, commitTime);
+
+            assertThat(counter.value(), is(2_000L));
+            assertThat(counter.nextMilestone(), is(4_000L));
+            assertThat(counter.lastMilestoneTimestamp().longValue(), 
is(commitTime.longValue()));
+        }
+
+        {
+            HybridTimestamp commitTime = HybridTimestamp.hybridTimestamp(200L);
+
+            counter.updateValue(threshold, commitTime);
+            assertThat(counter.value(), is(4_000L));
+            assertThat(counter.nextMilestone(), is(6_000L));
+            assertThat(counter.lastMilestoneTimestamp().longValue(), 
is(commitTime.longValue()));
+        }
+    }
+
+    @Test
+    @SuppressWarnings({"ThrowableNotThrown", 
"ResultOfObjectAllocationIgnored", "DataFlowIssue"})
+    void invalidUpdateValues() {
+        PartitionModificationCounter counter = factory.create(() -> 0L);
+
+        IgniteTestUtils.assertThrows(NullPointerException.class,
+                () -> counter.updateValue(1, null), "commitTimestamp");
+
+        IgniteTestUtils.assertThrows(
+                IllegalArgumentException.class,
+                () -> counter.updateValue(-1, HybridTimestamp.MIN_VALUE),
+                "Delta must be non-negative"
+        );
+
+        IgniteTestUtils.assertThrows(
+                NullPointerException.class,
+                () -> new PartitionModificationCounter(null, () -> 0L, 0.0d, 
0),
+                "initTimestamp"
+        );
+
+        IgniteTestUtils.assertThrows(
+                NullPointerException.class,
+                () -> new 
PartitionModificationCounter(HybridTimestamp.MIN_VALUE, null, 0.0d, 0),
+                "partitionSizeSupplier"
+        );
+
+        IgniteTestUtils.assertThrows(
+                IllegalArgumentException.class,
+                () -> new 
PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0L, 1.1d, 0),
+                "staleRowsFraction must be in [0, 1] range"
+        );
+
+        IgniteTestUtils.assertThrows(
+                IllegalArgumentException.class,
+                () -> new 
PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0L, -0.1d, 0),
+                "staleRowsFraction must be in [0, 1] range"
+        );
+
+        IgniteTestUtils.assertThrows(
+                IllegalArgumentException.class,
+                () -> new 
PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0L, -0.1d, -1),
+                "staleRowsFraction must be in [0, 1] range"
+        );
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
index 8c798b37567..502f85ad9e4 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
@@ -58,6 +58,7 @@ import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.type.NativeTypes;
@@ -172,7 +173,8 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
                 PARTITION_ID,
                 partitionDataStorage,
                 indexUpdateHandler,
-                replicationConfiguration
+                replicationConfiguration,
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
         );
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
index 8190b8e61d1..ec24635ff6b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -52,6 +52,7 @@ import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.Sto
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
 import org.apache.ignite.internal.storage.util.LockByRowId;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.type.NativeTypes;
@@ -164,7 +165,8 @@ public class StorageUpdateHandlerTest extends 
BaseMvStoragesTest {
                 PARTITION_ID,
                 partitionDataStorage,
                 indexUpdateHandler,
-                replicationConfiguration
+                replicationConfiguration,
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
         );
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 0c05751f230..c1246a7ddd7 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -500,7 +500,8 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 new SystemPropertiesNodeProperties(),
                 minTimeCollectorService,
                 systemDistributedConfiguration,
-                new NoOpMetricManager()
+                new NoOpMetricManager(),
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
         ) {
 
             @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 273d2bf1297..47fdd6720cf 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -925,7 +925,8 @@ public class TableManagerTest extends IgniteAbstractTest {
                 new SystemPropertiesNodeProperties(),
                 new MinimumRequiredTimeCollectorServiceImpl(),
                 systemDistributedConfiguration,
-                new NoOpMetricManager()
+                new NoOpMetricManager(),
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
         ) {
 
             @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 0df8f604b0e..12873a82feb 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -123,6 +123,7 @@ import 
org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.index.IndexMeta;
@@ -262,7 +263,8 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 PARTITION_ID,
                 partitionDataStorage,
                 indexUpdateHandler,
-                replicationConfiguration
+                replicationConfiguration,
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
         ));
 
         catalogService = mock(CatalogService.class);
@@ -522,7 +524,8 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 PARTITION_ID,
                 partitionDataStorage,
                 indexUpdateHandler,
-                replicationConfiguration
+                replicationConfiguration,
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
         );
 
         LeasePlacementDriver placementDriver = 
mock(LeasePlacementDriver.class);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 56407da3710..5ff851166ca 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -102,6 +102,7 @@ import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
@@ -280,7 +281,8 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                         PART_ID,
                         partitionDataStorage,
                         indexUpdateHandler,
-                        replicationConfiguration
+                        replicationConfiguration,
+                        TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
                 ),
                 new DummyValidationSchemasSource(schemaManager),
                 localNode,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
index 767cec133eb..54acdd64e82 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
@@ -98,6 +98,7 @@ import 
org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
@@ -249,7 +250,8 @@ public class PartitionReplicaListenerSortedIndexLockingTest 
extends IgniteAbstra
                         PART_ID,
                         partitionDataStorage,
                         indexUpdateHandler,
-                        replicationConfiguration
+                        replicationConfiguration,
+                        TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
                 ),
                 new DummyValidationSchemasSource(schemaManager),
                 localNode,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 47f842b134f..a67e31e8b37 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -212,6 +212,7 @@ import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
@@ -681,7 +682,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         PART_ID,
                         partitionDataStorage,
                         indexUpdateHandler,
-                        replicationConfiguration
+                        replicationConfiguration,
+                        TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
                 ),
                 validationSchemasSource,
                 localNode,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index d5394fd45b5..9b011cc0e96 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -181,6 +181,7 @@ import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
@@ -650,7 +651,8 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         PART_ID,
                         partitionDataStorage,
                         indexUpdateHandler,
-                        replicationConfiguration
+                        replicationConfiguration,
+                        TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
                 ),
                 validationSchemasSource,
                 localNode,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index e7082817154..f1e481000b8 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -151,6 +151,7 @@ import 
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.Stora
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
@@ -759,7 +760,8 @@ public class ItTxTestCluster {
                         partId,
                         partitionDataStorage,
                         indexUpdateHandler,
-                        replicationConfiguration
+                        replicationConfiguration,
+                        TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
                 );
 
                 DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schemaDescriptor);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index 387f4d4f9a6..988ab12a642 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -25,6 +25,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 import java.util.List;
+import java.util.function.LongSupplier;
 import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogService;
@@ -42,7 +43,10 @@ import 
org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.sql.SqlCommon;
+import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounter;
+import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory;
 import org.apache.ignite.sql.ColumnType;
 import org.jetbrains.annotations.Nullable;
 
@@ -60,6 +64,19 @@ public class TableTestUtils {
     /** Column name. */
     public static final String COLUMN_NAME = "TEST_COLUMN";
 
+    /** No-op partition modification counter. */
+    public static final PartitionModificationCounter 
NOOP_PARTITION_MODIFICATION_COUNTER =
+            new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 
0, 0, 0);
+
+    /** No-op partition modification counter factory. */
+    public static PartitionModificationCounterFactory 
NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY =
+            new PartitionModificationCounterFactory(() -> 
HybridTimestamp.MIN_VALUE) {
+                @Override
+                public PartitionModificationCounter create(LongSupplier 
partitionSizeSupplier) {
+                    return NOOP_PARTITION_MODIFICATION_COUNTER;
+                }
+            };
+
     /**
      * Creates table in the catalog.
      *
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 31d87ad173c..50ea759096e 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -116,6 +116,7 @@ import 
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
@@ -448,7 +449,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 PART_ID,
                 partitionDataStorage,
                 indexUpdateHandler,
-                replicationConfiguration
+                replicationConfiguration,
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
         );
 
         DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schema);

Reply via email to