This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4dddc4d3516 IGNITE-27668 Refactor TableMetricSource to use common 
interface for writable entities (#7479)
4dddc4d3516 is described below

commit 4dddc4d351607c90e36c2d4ebda1c611cb638e2b
Author: Phillippko <[email protected]>
AuthorDate: Tue Jan 27 17:06:24 2026 +0700

    IGNITE-27668 Refactor TableMetricSource to use common interface for 
writable entities (#7479)
---
 .../ignite/client/fakes/FakeInternalTable.java     |  4 +-
 .../internal/table/metrics/ItTableMetricsTest.java | 38 ++++++---
 .../ignite/internal/table/InternalTable.java       |  4 +-
 .../apache/ignite/internal/table/TableImpl.java    |  4 +-
 .../ignite/internal/table/TableViewInternal.java   |  4 +-
 .../internal/table/distributed/TableManager.java   | 14 ++--
 .../replicator/PartitionReplicaListener.java       | 92 ++++++++++++----------
 .../distributed/storage/InternalTableImpl.java     |  8 +-
 .../table/metrics/ReadWriteMetricSource.java       | 59 ++++++++++++++
 .../internal/table/metrics/TableMetricSource.java  | 48 +++++------
 10 files changed, 176 insertions(+), 99 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 2512b566314..ed7c997a7df 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -58,7 +58,7 @@ import org.apache.ignite.internal.table.IndexScanCriteria;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.table.DataStreamerReceiverDescriptor;
@@ -524,7 +524,7 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
     }
 
     @Override
-    public TableMetricSource metrics() {
+    public ReadWriteMetricSource metrics() {
         return null;
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
index 7d18249ec38..31787fe49a5 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
@@ -171,14 +171,14 @@ public class ItTableMetricsTest extends 
ClusterPerClassIntegrationTest {
 
     @Test
     void put() {
-        testKeyValueViewOperation(WRITES, 1, view -> view.put(null, 42, 
"value_42"));
+        testKeyValueViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L, 
1L), view -> view.put(null, 42, "value_42"));
     }
 
     @Test
     void putAll() {
         Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, 
"19", 23, "23");
 
-        testKeyValueViewOperation(WRITES, values.size(), view -> 
view.putAll(null, values));
+        testKeyValueViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L, 
(long) values.size()), view -> view.putAll(null, values));
     }
 
     @Test
@@ -193,10 +193,10 @@ public class ItTableMetricsTest extends 
ClusterPerClassIntegrationTest {
         kvView.put(null, key, "value_42");
 
         // Remove existing key.
-        testKeyValueViewOperation(WRITES, 1, view -> view.remove(null, key));
+        testKeyValueViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L, 
1L), view -> view.remove(null, key));
 
         // Remove non existing key.
-        testKeyValueViewOperation(WRITES, 0, view -> view.remove(null, key));
+        testKeyValueViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L, 
0L), view -> view.remove(null, key));
     }
 
     @Test
@@ -223,7 +223,13 @@ public class ItTableMetricsTest extends 
ClusterPerClassIntegrationTest {
         kvView.removeAll(null);
         kvView.putAll(null, values);
 
-        testKeyValueViewOperation(WRITES, values.size(), view -> 
view.removeAll(null));
+        // TODO https://issues.apache.org/jira/browse/IGNITE-27670 Fix 
removeAll effect on read metrics.
+        // Reads happen when batch is retrieved, even though removeAll 
shouldn't update read metrics.
+        testKeyValueViewOperation(
+                of(RO_READS, RW_READS, WRITES),
+                of(0L, (long) values.size(), (long) values.size()),
+                view -> view.removeAll(null)
+        );
     }
 
     @Test
@@ -234,16 +240,24 @@ public class ItTableMetricsTest extends 
ClusterPerClassIntegrationTest {
         kvView.putAll(null, values);
 
         // Remove existing keys.
-        testKeyValueViewOperation(WRITES, values.size(), view -> 
view.removeAll(null, values.keySet()));
+        testKeyValueViewOperation(
+                of(RO_READS, RW_READS, WRITES),
+                of(0L, 0L, (long) values.size()),
+                view -> view.removeAll(null, values.keySet())
+        );
 
         // Remove non-existing keys.
-        testKeyValueViewOperation(WRITES, 0, view -> view.removeAll(null, 
values.keySet()));
+        testKeyValueViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L, 
0L), view -> view.removeAll(null, values.keySet()));
 
         kvView.putAll(null, values);
 
         // Remove non-unique keys.
         List<Integer> nonUniqueKeys = of(12, 15, 12, 17, 19, 23);
-        testKeyValueViewOperation(WRITES, nonUniqueKeys.size() - 1, view -> 
view.removeAll(null, nonUniqueKeys));
+        testKeyValueViewOperation(
+                of(RO_READS, RW_READS, WRITES),
+                of(0L, 0L, nonUniqueKeys.size() - 1L),
+                view -> view.removeAll(null, nonUniqueKeys)
+        );
     }
 
     @Test
@@ -367,15 +381,15 @@ public class ItTableMetricsTest extends 
ClusterPerClassIntegrationTest {
         recordView(0).upsertAll(null, recs);
 
         // Delete existing keys.
-        testRecordViewOperation(WRITES, recs.size(), view -> 
view.deleteAll(null, keys));
+        testRecordViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L, 
((long) recs.size())), view -> view.deleteAll(null, keys));
 
         // Delete non-existing keys.
-        testRecordViewOperation(WRITES, 0L, view -> view.deleteAll(null, 
keys));
+        testRecordViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L, 
0L), view -> view.deleteAll(null, keys));
 
         recordView(0).insert(null, recs.get(0));
 
         // Delete one non-existing key.
-        testRecordViewOperation(WRITES, 1L, view -> view.deleteAll(null, 
keys));
+        testRecordViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L, 
1L), view -> view.deleteAll(null, keys));
 
         // Non-unique keys.
         List<Tuple> nonUniqueKeys = of(
@@ -389,7 +403,7 @@ public class ItTableMetricsTest extends 
ClusterPerClassIntegrationTest {
 
         recordView(0).upsertAll(null, nonUniqueRecs);
 
-        testRecordViewOperation(WRITES, 2L, view -> view.deleteAll(null, 
nonUniqueKeys));
+        testRecordViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L, 
2L), view -> view.deleteAll(null, nonUniqueKeys));
     }
 
     @Test
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index bfd2e21bff1..64ccd5e9f9b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.tx.TransactionException;
@@ -454,5 +454,5 @@ public interface InternalTable extends ManuallyCloseable {
      *
      * @return Table metrics source.
      */
-    TableMetricSource metrics();
+    ReadWriteMetricSource metrics();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 26a163aa705..d5cb5d3bd9c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -47,7 +47,7 @@ import 
org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import 
org.apache.ignite.internal.table.distributed.TableStatsStalenessConfiguration;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
 import org.apache.ignite.internal.table.partition.HashPartitionManagerImpl;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.sql.IgniteSql;
@@ -321,7 +321,7 @@ public class TableImpl implements TableViewInternal {
     }
 
     @Override
-    public TableMetricSource metrics() {
+    public ReadWriteMetricSource metrics() {
         return tbl.metrics();
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
index 7ef9edccb2a..6729fc53354 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
@@ -27,7 +27,7 @@ import 
org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
 import 
org.apache.ignite.internal.table.distributed.TableStatsStalenessConfiguration;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
@@ -136,7 +136,7 @@ public interface TableViewInternal extends Table {
      *
      * @return Table metrics source.
      */
-    TableMetricSource metrics();
+    ReadWriteMetricSource metrics();
 
     /**
      * Updates staleness configuration with provided parameters.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 39b917c0505..0de3cbddb2d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -168,6 +168,7 @@ import 
org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.distributed.storage.NullStorageEngine;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
 import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.LockManager;
@@ -1192,7 +1193,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 Objects.requireNonNull(streamerReceiverRunner),
                 () -> txCfg.value().readWriteTimeoutMillis(),
                 () -> txCfg.value().readOnlyTimeoutMillis(),
-                
createAndRegisterMetricsSource(tableStorage.getTableDescriptor(), tableName)
+                createAndRegisterMetricsSource(tableStorage, tableName)
         );
 
         CatalogTableProperties descProps = tableDescriptor.properties();
@@ -2009,11 +2010,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
     }
 
-    private TableMetricSource 
createAndRegisterMetricsSource(StorageTableDescriptor tableDescriptor, 
QualifiedName tableName) {
+    private ReadWriteMetricSource 
createAndRegisterMetricsSource(MvTableStorage tableStorage, QualifiedName 
tableName) {
+        StorageTableDescriptor tableDescriptor = 
tableStorage.getTableDescriptor();
+
+        CatalogTableDescriptor catalogTableDescriptor = 
catalogService.latestCatalog().table(tableDescriptor.getId());
+
         // The table might be created during the recovery phase.
         // In that case, we should only register the metric source for the 
actual tables that exist in the latest catalog.
-        boolean registrationNeeded =
-                catalogService.latestCatalog().table(tableDescriptor.getId()) 
!= null;
+        boolean registrationNeeded = catalogTableDescriptor != null;
 
         StorageEngine engine = 
dataStorageMgr.engineByStorageProfile(tableDescriptor.getStorageProfile());
 
@@ -2032,7 +2036,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             }
         }
 
-        TableMetricSource source = new TableMetricSource(tableName);
+        ReadWriteMetricSource source = new TableMetricSource(tableName);
 
         if (registrationNeeded) {
             try {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 9bd4da24b5f..3d7e5a0ab50 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -177,7 +177,7 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.TableUtils;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
 import org.apache.ignite.internal.tx.DelayedAckException;
 import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockException;
@@ -320,7 +320,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
 
     private static final boolean SKIP_UPDATES = 
getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
 
-    private final TableMetricSource metrics;
+    private final ReadWriteMetricSource metrics;
 
     private final TableAwareReplicaRequestPreProcessor 
tableAwareReplicaRequestPreProcessor;
     private final ReliableCatalogVersions reliableCatalogVersions;
@@ -380,7 +380,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             IndexMetaStorage indexMetaStorage,
             LowWatermark lowWatermark,
             FailureProcessor failureProcessor,
-            TableMetricSource metrics
+            ReadWriteMetricSource metrics
     ) {
         this.mvDataStorage = mvDataStorage;
         this.txManager = txManager;
@@ -582,7 +582,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                         } else {
                             return 
validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
                                     .thenApply(ignored -> {
-                                        metrics.onRead(rows.size(), false);
+                                        metrics.onRead(rows.size(), false, 
true);
 
                                         return rows;
                                     });
@@ -651,7 +651,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                 return safeReadFuture
                         .thenCompose(unused -> lookupIndex(request, 
indexStorage))
                         .thenApply(rows -> {
-                            metrics.onRead(rows.size(), true);
+                            metrics.onRead(rows.size(), true, true);
 
                             return rows;
                         });
@@ -662,7 +662,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             return safeReadFuture
                     .thenCompose(unused -> scanSortedIndex(request, 
indexStorage))
                     .thenApply(rows -> {
-                        metrics.onRead(rows.size(), true);
+                        metrics.onRead(rows.size(), true, true);
 
                         return rows;
                     });
@@ -672,7 +672,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                 .thenCompose(
                         unused -> retrieveExactEntriesUntilCursorEmpty(txId, 
request.coordinatorId(), readTimestamp, cursorId, batchCount))
                 .thenApply(rows -> {
-                    metrics.onRead(rows.size(), true);
+                    metrics.onRead(rows.size(), true, true);
 
                     return rows;
                 });
@@ -1683,13 +1683,13 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
 
             // Nothing found in the storage, return null.
             if (writeIntents.isEmpty() && regularEntries.isEmpty()) {
-                metrics.onRead(true);
+                metrics.onRead(true, false);
 
                 return nullCompletedFuture();
             }
 
             if (writeIntents.isEmpty()) {
-                metrics.onRead(true);
+                metrics.onRead(true, true);
 
                 // No write intents, then return the committed value. We 
already know that regularEntries is not empty.
                 return completedFuture(regularEntries.get(0).binaryRow());
@@ -1704,7 +1704,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                         resolveWriteIntentReadability(writeIntent, ts)
                                 .thenApply(writeIntentReadable ->
                                         inBusyLock(busyLock, () -> {
-                                            metrics.onRead(true);
+                                            metrics.onRead(true, true);
 
                                             if (writeIntentReadable) {
                                                 return findAny(writeIntents, 
wi -> !wi.isEmpty()).map(ReadResult::binaryRow).orElse(null);
@@ -1861,7 +1861,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                     }
 
                     if (rowIdsToDelete.isEmpty()) {
-                        metrics.onRead(searchRows.size(), false);
+                        metrics.onRead(searchRows.size(), false, false);
 
                         return completedFuture(new ReplicaResult(result, 
null));
                     }
@@ -1877,8 +1877,8 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                             )
                             .thenApply(res -> {
-                                metrics.onRead(searchRows.size(), false);
-                                metrics.onWrite(rowIdsToDelete.size());
+                                metrics.onRead(searchRows.size(), false, true);
+                                metrics.onRemoval(rowIdsToDelete.size());
 
                                 return new ReplicaResult(result, res);
                             });
@@ -1917,8 +1917,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                     }
 
                     if (rowsToInsert.isEmpty()) {
-                        metrics.onRead(searchRows.size(), false);
-
+                        metrics.onRead(searchRows.size(), false, false);
                         return completedFuture(new ReplicaResult(result, 
null));
                     }
 
@@ -1951,7 +1950,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                             )
                             .thenApply(res -> {
-                                metrics.onRead(searchRows.size(), false);
+                                metrics.onRead(searchRows.size(), false, true);
                                 metrics.onWrite(rowsToInsert.size());
 
                                 // Release short term locks.
@@ -2056,8 +2055,6 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                     }
 
                     if (rowsToUpdate.isEmpty()) {
-                        metrics.onRead(uniqueKeysCountFinal, false);
-
                         return completedFuture(new ReplicaResult(null, null));
                     }
 
@@ -2072,7 +2069,6 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                             )
                             .thenApply(res -> {
-                                metrics.onRead(uniqueKeysCountFinal, false);
                                 metrics.onWrite(uniqueKeysCountFinal);
 
                                 // Release short term locks.
@@ -2134,15 +2130,24 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                 result.add(rowFut.join());
                             }
 
-                            if (allElementsAreNull(result)) {
-                                metrics.onRead(result.size(), false);
+                            int hits = 0;
+                            for (BinaryRow row : result) {
+                                if (row != null) {
+                                    hits++;
+                                }
+                            }
+
+                            if (hits == 0) {
+                                metrics.onRead(result.size(), false, false);
 
                                 return completedFuture(new 
ReplicaResult(result, null));
                             }
 
+                            int finalHits = hits;
                             return 
validateRwReadAgainstSchemaAfterTakingLocks(txId)
                                     .thenApply(unused -> {
-                                        metrics.onRead(result.size(), false);
+                                        metrics.onRead(result.size() - 
finalHits, false, false);
+                                        metrics.onRead(finalHits, false, true);
 
                                         return new ReplicaResult(result, null);
                                     });
@@ -2208,7 +2213,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                             )
                             .thenApply(res -> {
-                                metrics.onWrite(rowIdsToDelete.size());
+                                metrics.onRemoval(rowIdsToDelete.size());
 
                                 return new ReplicaResult(result, res);
                             });
@@ -2747,7 +2752,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             case RW_DELETE_EXACT: {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
-                        metrics.onRead(false);
+                        metrics.onRead(false, false);
 
                         return completedFuture(new ReplicaResult(false, null));
                     }
@@ -2755,7 +2760,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                     return takeLocksForDeleteExact(searchRow, rowId, row, txId)
                             .thenCompose(validatedRowId -> {
                                 if (validatedRowId == null) {
-                                    metrics.onRead(false);
+                                    metrics.onRead(false, false);
 
                                     return completedFuture(new 
ReplicaResult(false, null));
                                 }
@@ -2773,8 +2778,8 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                                 )
                                         )
                                         .thenApply(res -> {
-                                            metrics.onRead(false);
-                                            metrics.onWrite();
+                                            metrics.onRead(false, true);
+                                            metrics.onRemoval();
 
                                             return new ReplicaResult(true, 
res);
                                         });
@@ -2784,7 +2789,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             case RW_INSERT: {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId != null) {
-                        metrics.onRead(false);
+                        metrics.onRead(false, true);
 
                         return completedFuture(new ReplicaResult(false, null));
                     }
@@ -2805,7 +2810,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
                             .thenApply(tuple -> {
-                                metrics.onRead(false);
+                                metrics.onRead(false, true);
                                 metrics.onWrite();
 
                                 // Release short term locks.
@@ -2874,7 +2879,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
                             .thenApply(tuple -> {
-                                metrics.onRead(false);
+                                metrics.onRead(false, true);
                                 metrics.onWrite();
 
                                 // Release short term locks.
@@ -2887,7 +2892,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             case RW_GET_AND_REPLACE: {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
-                        metrics.onRead(false);
+                        metrics.onRead(false, false);
 
                         return completedFuture(new ReplicaResult(null, null));
                     }
@@ -2907,7 +2912,8 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
                             .thenApply(tuple -> {
-                                metrics.onRead(false);
+                                metrics.onRead(false, rowId != null);
+
                                 metrics.onWrite();
 
                                 // Release short term locks.
@@ -2920,7 +2926,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             case RW_REPLACE_IF_EXIST: {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
-                        metrics.onRead(false);
+                        metrics.onRead(false, false);
 
                         return completedFuture(new ReplicaResult(false, null));
                     }
@@ -2940,7 +2946,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
                             .thenApply(tuple -> {
-                                metrics.onRead(false);
+                                metrics.onRead(false, false);
                                 metrics.onWrite();
 
                                 // Release short term locks.
@@ -2976,7 +2982,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             case RW_GET: {
                 return resolveRowByPk(primaryKey, txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
-                        metrics.onRead(false);
+                        metrics.onRead(false, false);
 
                         return nullCompletedFuture();
                     }
@@ -2984,7 +2990,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                     return takeLocksForGet(rowId, txId)
                             .thenCompose(ignored -> 
validateRwReadAgainstSchemaAfterTakingLocks(txId))
                             .thenApply(ignored -> {
-                                metrics.onRead(false);
+                                metrics.onRead(false, true);
 
                                 return new ReplicaResult(row, null);
                             });
@@ -3014,7 +3020,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                             )
                             .thenApply(res -> {
-                                metrics.onWrite();
+                                metrics.onRemoval();
 
                                 return new ReplicaResult(true, res);
                             });
@@ -3023,7 +3029,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             case RW_GET_AND_DELETE: {
                 return resolveRowByPk(primaryKey, txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
-                        metrics.onRead(false);
+                        metrics.onRead(false, false);
 
                         return nullCompletedFuture();
                     }
@@ -3046,8 +3052,8 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                             )
                             .thenApply(res -> {
-                                metrics.onRead(false);
-                                metrics.onWrite();
+                                metrics.onRead(false, true);
+                                metrics.onRemoval();
 
                                 return new ReplicaResult(row, res);
                             });
@@ -3265,7 +3271,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         if (request.requestType() == RW_REPLACE) {
             return resolveRowByPk(extractPk(newRow), txId, (rowId, row, 
lastCommitTime) -> {
                 if (rowId == null) {
-                    metrics.onRead(false);
+                    metrics.onRead(false, false);
 
                     return completedFuture(new ReplicaResult(false, null));
                 }
@@ -3273,7 +3279,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                 return takeLocksForReplace(expectedRow, row, newRow, rowId, 
txId)
                         .thenCompose(rowIdLock -> {
                             if (rowIdLock == null) {
-                                metrics.onRead(false);
+                                metrics.onRead(false, false);
 
                                 return completedFuture(new 
ReplicaResult(false, null));
                             }
@@ -3296,7 +3302,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock))
                                     .thenApply(tuple -> {
-                                        metrics.onRead(false);
+                                        metrics.onRead(false, true);
                                         metrics.onWrite();
 
                                         // Release short term locks.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 320c2193d73..fc5cfc416bd 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -122,7 +122,7 @@ import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.TxContext;
 import 
org.apache.ignite.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TransactionIds;
@@ -198,7 +198,7 @@ public class InternalTableImpl implements InternalTable {
     /** Default read-only transaction timeout. */
     private final Supplier<Long> defaultReadTxTimeout;
 
-    private final TableMetricSource metrics;
+    private final ReadWriteMetricSource metrics;
 
     /**
      * Constructor.
@@ -236,7 +236,7 @@ public class InternalTableImpl implements InternalTable {
             StreamerReceiverRunner streamerReceiverRunner,
             Supplier<Long> defaultRwTxTimeout,
             Supplier<Long> defaultReadTxTimeout,
-            TableMetricSource metrics
+            ReadWriteMetricSource metrics
     ) {
         this.tableName = tableName;
         this.zoneId = zoneId;
@@ -2323,7 +2323,7 @@ public class InternalTableImpl implements InternalTable {
     }
 
     @Override
-    public TableMetricSource metrics() {
+    public ReadWriteMetricSource metrics() {
         return metrics;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/ReadWriteMetricSource.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/ReadWriteMetricSource.java
new file mode 100644
index 00000000000..687baa52ca5
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/ReadWriteMetricSource.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.metrics;
+
+import org.apache.ignite.internal.metrics.MetricSource;
+
+/** Common interface for reads and writes to tables and caches. */
+public interface ReadWriteMetricSource extends MetricSource {
+    /**
+     * Called after get request.
+     *
+     * @param readOnly {@code true} if read operation is executed within 
read-only transaction, and {@code false} otherwise.
+     * @param hit {@code true} if row was found, {@code false} otherwise.
+     */
+    void onRead(boolean readOnly, boolean hit);
+
+    /**
+     * Called after get request for multiple rows.
+     *
+     * @param readOnly {@code true} if read operation is executed within 
read-only transaction, and {@code false} otherwise.
+     * @param hit {code true} if row was found, {@code false} otherwise.
+     */
+    void onRead(int x, boolean readOnly, boolean hit);
+
+    /**
+     * Increments a counter of writes.
+     */
+    void onWrite();
+
+    /**
+     * Adds the given {@code x} to a counter of writes.
+     */
+    void onWrite(int x);
+
+    /**
+     * Should be called instead of {@link #onWrite} if row was removed.
+     */
+    void onRemoval();
+
+    /**
+     * Should be called instead of {@link #onWrite} if row was removed.
+     */
+    void onRemoval(int x);
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
index a8800ee4c52..4c5bde703f4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
@@ -107,7 +107,7 @@ import org.apache.ignite.table.QualifiedName;
  *
  * <i>Note: Only synchronous methods are listed. Asynchronous methods affect 
the same metrics.</i>
  */
-public class TableMetricSource extends AbstractMetricSource<Holder> {
+public class TableMetricSource extends AbstractMetricSource<Holder> implements 
ReadWriteMetricSource {
     /** Source name. */
     public static final String SOURCE_NAME = "tables";
 
@@ -147,29 +147,17 @@ public class TableMetricSource extends 
AbstractMetricSource<Holder> {
         return tableName;
     }
 
-    /**
-     * Increments a counter of reads.
-     *
-     * @param readOnly {@code true} if read operation is executed within 
read-only transaction, and {@code false} otherwise.
-     */
-    public void onRead(boolean readOnly) {
-        Holder holder = holder();
+    @Override
+    public void onRead(boolean readOnly, boolean hit) {
+        onRead(1, readOnly);
+    }
 
-        if (holder != null) {
-            if (readOnly) {
-                holder.roReads.increment();
-            } else {
-                holder.rwReads.increment();
-            }
-        }
+    @Override
+    public void onRead(int x, boolean readOnly, boolean hit) {
+        onRead(x, readOnly);
     }
 
-    /**
-     * Adds the given {@code x} to a counter of reads.
-     *
-     * @param readOnly {@code true} if read operation is executed within 
read-only transaction, and {@code false} otherwise.
-     */
-    public void onRead(int x, boolean readOnly) {
+    private void onRead(int x, boolean readOnly) {
         Holder holder = holder();
 
         if (holder != null) {
@@ -181,9 +169,7 @@ public class TableMetricSource extends 
AbstractMetricSource<Holder> {
         }
     }
 
-    /**
-     * Increments a counter of writes.
-     */
+    @Override
     public void onWrite() {
         Holder holder = holder();
 
@@ -192,9 +178,7 @@ public class TableMetricSource extends 
AbstractMetricSource<Holder> {
         }
     }
 
-    /**
-     * Adds the given {@code x} to a counter of writes.
-     */
+    @Override
     public void onWrite(int x) {
         Holder holder = holder();
 
@@ -203,6 +187,16 @@ public class TableMetricSource extends 
AbstractMetricSource<Holder> {
         }
     }
 
+    @Override
+    public void onRemoval() {
+        onWrite();
+    }
+
+    @Override
+    public void onRemoval(int x) {
+        onWrite(x);
+    }
+
     @Override
     protected Holder createHolder() {
         return new Holder();


Reply via email to