This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 701cf88c33 [core] Make with MetricRegistry public (#5578)
701cf88c33 is described below

commit 701cf88c332b1449a352742cef013c589b0d975f
Author: JingsongLi <[email protected]>
AuthorDate: Thu May 8 13:48:25 2025 +0800

    [core] Make with MetricRegistry public (#5578)
---
 .../org/apache/paimon/metrics/MetricRegistry.java   | 21 ++++++++++++---------
 .../paimon/operation/metrics/CommitMetrics.java     |  2 +-
 .../paimon/operation/metrics/CompactionMetrics.java |  2 +-
 .../paimon/operation/metrics/ScanMetrics.java       |  2 +-
 .../operation/metrics/WriterBufferMetric.java       |  2 +-
 .../paimon/table/AppendOnlyFileStoreTable.java      |  2 +-
 .../paimon/table/FallbackReadFileStoreTable.java    |  6 +++---
 .../apache/paimon/table/sink/InnerTableCommit.java  |  1 +
 .../org/apache/paimon/table/sink/TableCommit.java   |  4 ++++
 .../org/apache/paimon/table/sink/TableWrite.java    |  2 +-
 .../paimon/table/source/AbstractDataTableRead.java  |  2 +-
 .../paimon/table/source/AbstractDataTableScan.java  |  2 +-
 .../apache/paimon/table/source/InnerTableRead.java  |  6 ++++++
 .../apache/paimon/table/source/InnerTableScan.java  |  3 ++-
 .../paimon/table/source/KeyValueTableRead.java      |  2 +-
 .../org/apache/paimon/table/source/TableRead.java   |  4 ++++
 .../org/apache/paimon/table/source/TableScan.java   |  4 ++++
 .../apache/paimon/table/system/AuditLogTable.java   |  8 ++++----
 .../org/apache/paimon/metrics/MetricGroupTest.java  |  4 ++--
 .../apache/paimon/metrics/TestMetricRegistry.java   |  4 ++--
 .../paimon/flink/lookup/LookupCompactDiffRead.java  |  3 +--
 .../paimon/flink/metrics/FlinkMetricRegistry.java   |  4 ++--
 .../flink/source/ContinuousFileStoreSource.java     |  2 +-
 .../org/apache/paimon/flink/source/FlinkSource.java | 10 ++++++++--
 .../paimon/flink/source/LogHybridSourceFactory.java |  2 +-
 .../paimon/flink/source/StaticFileStoreSource.java  |  2 +-
 .../source/ContinuousFileSplitEnumeratorTest.java   |  6 ++++++
 .../org/apache/paimon/spark/PaimonBaseScan.scala    |  2 +-
 .../paimon/spark/metric/SparkMetricRegistry.scala   |  2 +-
 29 files changed, 75 insertions(+), 41 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java 
b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
index 25bcac4cbb..ce5f80c384 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
@@ -18,21 +18,24 @@
 
 package org.apache.paimon.metrics;
 
+import org.apache.paimon.annotation.Public;
+
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-/** Factory to create {@link MetricGroup}s. */
-public abstract class MetricRegistry {
-
-    private static final String KEY_TABLE = "table";
+/**
+ * Factory to create {@link MetricGroup}s.
+ *
+ * @since 1.2.0
+ */
+@Public
+public interface MetricRegistry {
 
-    public MetricGroup tableMetricGroup(String groupName, String tableName) {
+    default MetricGroup createTableMetricGroup(String groupName, String 
tableName) {
         Map<String, String> variables = new LinkedHashMap<>();
-        variables.put(KEY_TABLE, tableName);
-
+        variables.put("table", tableName);
         return createMetricGroup(groupName, variables);
     }
 
-    protected abstract MetricGroup createMetricGroup(
-            String groupName, Map<String, String> variables);
+    MetricGroup createMetricGroup(String groupName, Map<String, String> 
variables);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
index 0f8ccbc65c..e64fc7be16 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
@@ -32,7 +32,7 @@ public class CommitMetrics {
     private final MetricGroup metricGroup;
 
     public CommitMetrics(MetricRegistry registry, String tableName) {
-        this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
+        this.metricGroup = registry.createTableMetricGroup(GROUP_NAME, 
tableName);
         registerGenericCommitMetrics();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
index 57d8e2bac0..67d16a895a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
@@ -62,7 +62,7 @@ public class CompactionMetrics {
     private Counter compactionsQueuedCounter;
 
     public CompactionMetrics(MetricRegistry registry, String tableName) {
-        this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
+        this.metricGroup = registry.createTableMetricGroup(GROUP_NAME, 
tableName);
         this.reporters = new HashMap<>();
         this.compactTimers = new ConcurrentHashMap<>();
         this.compactionTimes = new ConcurrentLinkedQueue<>();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
index be6b514e04..280f53a101 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
@@ -43,7 +43,7 @@ public class ScanMetrics {
     private ScanStats latestScan;
 
     public ScanMetrics(MetricRegistry registry, String tableName) {
-        metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
+        metricGroup = registry.createTableMetricGroup(GROUP_NAME, tableName);
         metricGroup.gauge(
                 LAST_SCAN_DURATION, () -> latestScan == null ? 0L : 
latestScan.getDuration());
         durationHistogram = metricGroup.histogram(SCAN_DURATION, 
HISTOGRAM_WINDOW_SIZE);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
index d414383aa5..e4c676ab1e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
@@ -42,7 +42,7 @@ public class WriterBufferMetric {
             Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
             MetricRegistry metricRegistry,
             String tableName) {
-        metricGroup = metricRegistry.tableMetricGroup(GROUP_NAME, tableName);
+        metricGroup = metricRegistry.createTableMetricGroup(GROUP_NAME, 
tableName);
         numWriters = new AtomicInteger(0);
         metricGroup.gauge(NUM_WRITERS, numWriters::get);
         metricGroup.gauge(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 1fece2596f..5c4e4a6a13 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -108,7 +108,7 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     @Override
     public InnerTableRead newRead() {
         RawFileSplitRead read = store().newRead();
-        return new AbstractDataTableRead<InternalRow>(schema()) {
+        return new AbstractDataTableRead(schema()) {
 
             @Override
             protected InnerTableRead innerWithFilter(Predicate predicate) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index dddc8ca439..33c7b2c472 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -296,9 +296,9 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         }
 
         @Override
-        public Scan withMetricsRegistry(MetricRegistry metricRegistry) {
-            mainScan.withMetricsRegistry(metricRegistry);
-            fallbackScan.withMetricsRegistry(metricRegistry);
+        public Scan withMetricRegistry(MetricRegistry metricRegistry) {
+            mainScan.withMetricRegistry(metricRegistry);
+            fallbackScan.withMetricRegistry(metricRegistry);
             return this;
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index 8355575d8d..1544375569 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -44,5 +44,6 @@ public interface InnerTableCommit extends StreamTableCommit, 
BatchTableCommit {
      */
     InnerTableCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
 
+    @Override
     InnerTableCommit withMetricRegistry(MetricRegistry registry);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
index 8845b7fa17..26f4ffd163 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.sink;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.table.Table;
 
 import java.util.List;
@@ -35,6 +36,9 @@ import java.util.List;
 @Public
 public interface TableCommit extends AutoCloseable {
 
+    /** Set {@link MetricRegistry} to table commit. */
+    TableCommit withMetricRegistry(MetricRegistry registry);
+
     /** Abort an unsuccessful commit. The data files will be deleted. */
     void abort(List<CommitMessage> commitMessages);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index 9c7369fb22..57892eff29 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -76,6 +76,6 @@ public interface TableWrite extends AutoCloseable {
      */
     void compact(BinaryRow partition, int bucket, boolean fullCompaction) 
throws Exception;
 
-    /** With metrics to measure compaction. */
+    /** Set {@link MetricRegistry} to table write. */
     TableWrite withMetricRegistry(MetricRegistry registry);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index 7f051325e8..da2ce8cb56 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -31,7 +31,7 @@ import java.io.IOException;
 import java.util.Optional;
 
 /** A {@link InnerTableRead} for data table. */
-public abstract class AbstractDataTableRead<T> implements InnerTableRead {
+public abstract class AbstractDataTableRead implements InnerTableRead {
 
     private final DefaultValueAssigner defaultValueAssigner;
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index c6de102581..9c103badad 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -113,7 +113,7 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
         return this;
     }
 
-    public AbstractDataTableScan withMetricsRegistry(MetricRegistry 
metricsRegistry) {
+    public AbstractDataTableScan withMetricRegistry(MetricRegistry 
metricsRegistry) {
         snapshotReader.withMetricRegistry(metricsRegistry);
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index a8f1890674..3a8ceeb353 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.types.RowType;
@@ -57,4 +58,9 @@ public interface InnerTableRead extends TableRead {
     default TableRead executeFilter() {
         return this;
     }
+
+    @Override
+    default InnerTableRead withMetricRegistry(MetricRegistry registry) {
+        return this;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 1c7b153184..26168bba2a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -59,7 +59,8 @@ public interface InnerTableScan extends TableScan {
         return this;
     }
 
-    default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
+    @Override
+    default InnerTableScan withMetricRegistry(MetricRegistry metricRegistry) {
         // do nothing, should implement this if need
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index 7bfbe5fd3e..2c2b6c280f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -46,7 +46,7 @@ import java.util.function.Supplier;
 /**
  * An abstraction layer above {@link MergeFileSplitRead} to provide reading of 
{@link InternalRow}.
  */
-public final class KeyValueTableRead extends AbstractDataTableRead<KeyValue> {
+public final class KeyValueTableRead extends AbstractDataTableRead {
 
     private final List<SplitReadProvider> readProviders;
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index 7b78d5aede..68d5a0f1f2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
@@ -38,6 +39,9 @@ import java.util.List;
 @Public
 public interface TableRead {
 
+    /** Set {@link MetricRegistry} to table read. */
+    TableRead withMetricRegistry(MetricRegistry registry);
+
     TableRead executeFilter();
 
     TableRead withIOManager(IOManager ioManager);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
index a5eae757ef..f3c667f19e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.table.Table;
 
 import java.util.List;
@@ -34,6 +35,9 @@ import java.util.stream.Collectors;
 @Public
 public interface TableScan {
 
+    /** Set {@link MetricRegistry} to table scan. */
+    TableScan withMetricRegistry(MetricRegistry registry);
+
     /** Plan splits, throws {@link EndOfScanException} if the scan is ended. */
     Plan plan();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index eabb1a6120..66b55b712d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -450,8 +450,8 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
 
         @Override
-        public InnerTableScan withMetricsRegistry(MetricRegistry 
metricsRegistry) {
-            batchScan.withMetricsRegistry(metricsRegistry);
+        public InnerTableScan withMetricRegistry(MetricRegistry 
metricsRegistry) {
+            batchScan.withMetricRegistry(metricsRegistry);
             return this;
         }
 
@@ -565,8 +565,8 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
 
         @Override
-        public StreamDataTableScan withMetricsRegistry(MetricRegistry 
metricsRegistry) {
-            streamScan.withMetricsRegistry(metricsRegistry);
+        public StreamDataTableScan withMetricRegistry(MetricRegistry 
metricsRegistry) {
+            streamScan.withMetricRegistry(metricsRegistry);
             return this;
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java 
b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java
index 90a063e324..6280ff3ccf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java
@@ -30,7 +30,7 @@ public class MetricGroupTest {
     @Test
     public void testGroupRegisterMetrics() {
         TestMetricRegistry registry = new TestMetricRegistry();
-        MetricGroup group = registry.tableMetricGroup("commit", "myTable");
+        MetricGroup group = registry.createTableMetricGroup("commit", 
"myTable");
 
         // these will fail is the registration is propagated
         group.counter("testcounter");
@@ -51,7 +51,7 @@ public class MetricGroupTest {
     public void testTolerateMetricNameCollisions() {
         final String name = "abctestname";
         TestMetricRegistry registry = new TestMetricRegistry();
-        MetricGroup group = registry.tableMetricGroup("commit", "myTable");
+        MetricGroup group = registry.createTableMetricGroup("commit", 
"myTable");
 
         Counter counter = group.counter(name);
         // return the old one with the metric name collision
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/metrics/TestMetricRegistry.java 
b/paimon-core/src/test/java/org/apache/paimon/metrics/TestMetricRegistry.java
index 4b2870041c..88b3e6e2ff 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/metrics/TestMetricRegistry.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/metrics/TestMetricRegistry.java
@@ -21,10 +21,10 @@ package org.apache.paimon.metrics;
 import java.util.Map;
 
 /** Implementation of {@link MetricRegistry} for tests. */
-public class TestMetricRegistry extends MetricRegistry {
+public class TestMetricRegistry implements MetricRegistry {
 
     @Override
-    protected MetricGroup createMetricGroup(String groupName, Map<String, 
String> variables) {
+    public MetricGroup createMetricGroup(String groupName, Map<String, String> 
variables) {
         return new MetricGroupImpl(groupName, variables);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index 2a140adc32..bbc0bcda17 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.lookup;
 
-import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.operation.MergeFileSplitRead;
@@ -38,7 +37,7 @@ import java.io.IOException;
 import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
 
 /** An {@link InnerTableRead} that reads the data changed before and after 
compaction. */
-public class LookupCompactDiffRead extends AbstractDataTableRead<KeyValue> {
+public class LookupCompactDiffRead extends AbstractDataTableRead {
     private final SplitRead<InternalRow> fullPhaseMergeRead;
     private final SplitRead<InternalRow> incrementalDiffRead;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
index 8fc233dd0f..9771fd45a6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
@@ -24,7 +24,7 @@ import org.apache.paimon.metrics.MetricRegistry;
 import java.util.Map;
 
 /** {@link MetricRegistry} to create {@link FlinkMetricGroup}. */
-public class FlinkMetricRegistry extends MetricRegistry {
+public class FlinkMetricRegistry implements MetricRegistry {
 
     private final org.apache.flink.metrics.MetricGroup group;
 
@@ -33,7 +33,7 @@ public class FlinkMetricRegistry extends MetricRegistry {
     }
 
     @Override
-    protected MetricGroup createMetricGroup(String groupName, Map<String, 
String> variables) {
+    public MetricGroup createMetricGroup(String groupName, Map<String, String> 
variables) {
         return new FlinkMetricGroup(group, groupName, variables);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index cc33ef167c..db39f90455 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -82,7 +82,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
         StreamTableScan scan = readBuilder.newStreamScan();
         if (metricGroup(context) != null) {
             ((StreamDataTableScan) scan)
-                    .withMetricsRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
+                    .withMetricRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
         }
         scan.restore(nextSnapshotId);
         return buildEnumerator(context, splits, nextSnapshotId, scan);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
index 5fcfc9b379..42da33cfa7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
@@ -20,8 +20,10 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.flink.NestedProjectedRowData;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
 
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -29,6 +31,7 @@ import 
org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
 import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
@@ -59,11 +62,14 @@ public abstract class FlinkSource
     public SourceReader<RowData, FileStoreSourceSplit> 
createReader(SourceReaderContext context) {
         IOManager ioManager =
                 
IOManager.create(splitPaths(context.getConfiguration().get(CoreOptions.TMP_DIRS)));
+        SourceReaderMetricGroup metricGroup = context.metricGroup();
         FileStoreSourceReaderMetrics sourceReaderMetrics =
-                new FileStoreSourceReaderMetrics(context.metricGroup());
+                new FileStoreSourceReaderMetrics(metricGroup);
+        TableRead tableRead =
+                readBuilder.newRead().withMetricRegistry(new 
FlinkMetricRegistry(metricGroup));
         return new FileStoreSourceReader(
                 context,
-                readBuilder.newRead(),
+                tableRead,
                 sourceReaderMetrics,
                 ioManager,
                 limit,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
index bc361cdbf3..afdcae2fff 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
@@ -132,7 +132,7 @@ public class LogHybridSourceFactory
                 // register scan metrics
                 if (context.metricGroup() != null) {
                     ((StreamDataTableScan) scan)
-                            .withMetricsRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
+                            .withMetricRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
                 }
                 splits = splitGenerator.createSplits(scan.plan());
                 Long nextSnapshotId = scan.checkpoint();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 624f543481..01aa464d7b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -93,7 +93,7 @@ public class StaticFileStoreSource extends FlinkSource {
         // register scan metrics
         if (context.metricGroup() != null) {
             ((InnerTableScan) scan)
-                    .withMetricsRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
+                    .withMetricRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
         }
         return splitGenerator.createSplits(scan.plan());
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index aca2d43e16..6f05e5dd7d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
@@ -895,6 +896,11 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
             this.nextSnapshotIdForConsumer = null;
         }
 
+        @Override
+        public TableScan withMetricRegistry(MetricRegistry registry) {
+            return this;
+        }
+
         @Override
         public Plan plan() {
             Map.Entry<Long, Plan> planEntry = results.pollFirstEntry();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 5e790cb301..74741f5364 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -68,7 +68,7 @@ abstract class PaimonBaseScan(
     readBuilder
       .newScan()
       .asInstanceOf[InnerTableScan]
-      .withMetricsRegistry(paimonMetricsRegistry)
+      .withMetricRegistry(paimonMetricsRegistry)
       .plan()
       .splits()
       .asScala
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
index 37ba98a4a6..3d858a623f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
@@ -32,7 +32,7 @@ case class SparkMetricRegistry() extends MetricRegistry {
 
   private val metricGroups = mutable.Map.empty[String, MetricGroup]
 
-  override protected def createMetricGroup(
+  override def createMetricGroup(
       groupName: String,
       variables: util.Map[String, String]): MetricGroup = {
     val metricGroup = new MetricGroupImpl(groupName, variables)

Reply via email to