This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 701cf88c33 [core] Make with MetricRegistry public (#5578)
701cf88c33 is described below
commit 701cf88c332b1449a352742cef013c589b0d975f
Author: JingsongLi <[email protected]>
AuthorDate: Thu May 8 13:48:25 2025 +0800
[core] Make with MetricRegistry public (#5578)
---
.../org/apache/paimon/metrics/MetricRegistry.java | 21 ++++++++++++---------
.../paimon/operation/metrics/CommitMetrics.java | 2 +-
.../paimon/operation/metrics/CompactionMetrics.java | 2 +-
.../paimon/operation/metrics/ScanMetrics.java | 2 +-
.../operation/metrics/WriterBufferMetric.java | 2 +-
.../paimon/table/AppendOnlyFileStoreTable.java | 2 +-
.../paimon/table/FallbackReadFileStoreTable.java | 6 +++---
.../apache/paimon/table/sink/InnerTableCommit.java | 1 +
.../org/apache/paimon/table/sink/TableCommit.java | 4 ++++
.../org/apache/paimon/table/sink/TableWrite.java | 2 +-
.../paimon/table/source/AbstractDataTableRead.java | 2 +-
.../paimon/table/source/AbstractDataTableScan.java | 2 +-
.../apache/paimon/table/source/InnerTableRead.java | 6 ++++++
.../apache/paimon/table/source/InnerTableScan.java | 3 ++-
.../paimon/table/source/KeyValueTableRead.java | 2 +-
.../org/apache/paimon/table/source/TableRead.java | 4 ++++
.../org/apache/paimon/table/source/TableScan.java | 4 ++++
.../apache/paimon/table/system/AuditLogTable.java | 8 ++++----
.../org/apache/paimon/metrics/MetricGroupTest.java | 4 ++--
.../apache/paimon/metrics/TestMetricRegistry.java | 4 ++--
.../paimon/flink/lookup/LookupCompactDiffRead.java | 3 +--
.../paimon/flink/metrics/FlinkMetricRegistry.java | 4 ++--
.../flink/source/ContinuousFileStoreSource.java | 2 +-
.../org/apache/paimon/flink/source/FlinkSource.java | 10 ++++++++--
.../paimon/flink/source/LogHybridSourceFactory.java | 2 +-
.../paimon/flink/source/StaticFileStoreSource.java | 2 +-
.../source/ContinuousFileSplitEnumeratorTest.java | 6 ++++++
.../org/apache/paimon/spark/PaimonBaseScan.scala | 2 +-
.../paimon/spark/metric/SparkMetricRegistry.scala | 2 +-
29 files changed, 75 insertions(+), 41 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
index 25bcac4cbb..ce5f80c384 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
@@ -18,21 +18,24 @@
package org.apache.paimon.metrics;
+import org.apache.paimon.annotation.Public;
+
import java.util.LinkedHashMap;
import java.util.Map;
-/** Factory to create {@link MetricGroup}s. */
-public abstract class MetricRegistry {
-
- private static final String KEY_TABLE = "table";
+/**
+ * Factory to create {@link MetricGroup}s.
+ *
+ * @since 1.2.0
+ */
+@Public
+public interface MetricRegistry {
- public MetricGroup tableMetricGroup(String groupName, String tableName) {
+ default MetricGroup createTableMetricGroup(String groupName, String
tableName) {
Map<String, String> variables = new LinkedHashMap<>();
- variables.put(KEY_TABLE, tableName);
-
+ variables.put("table", tableName);
return createMetricGroup(groupName, variables);
}
- protected abstract MetricGroup createMetricGroup(
- String groupName, Map<String, String> variables);
+ MetricGroup createMetricGroup(String groupName, Map<String, String>
variables);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
index 0f8ccbc65c..e64fc7be16 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
@@ -32,7 +32,7 @@ public class CommitMetrics {
private final MetricGroup metricGroup;
public CommitMetrics(MetricRegistry registry, String tableName) {
- this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
+ this.metricGroup = registry.createTableMetricGroup(GROUP_NAME,
tableName);
registerGenericCommitMetrics();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
index 57d8e2bac0..67d16a895a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
@@ -62,7 +62,7 @@ public class CompactionMetrics {
private Counter compactionsQueuedCounter;
public CompactionMetrics(MetricRegistry registry, String tableName) {
- this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
+ this.metricGroup = registry.createTableMetricGroup(GROUP_NAME,
tableName);
this.reporters = new HashMap<>();
this.compactTimers = new ConcurrentHashMap<>();
this.compactionTimes = new ConcurrentLinkedQueue<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
index be6b514e04..280f53a101 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
@@ -43,7 +43,7 @@ public class ScanMetrics {
private ScanStats latestScan;
public ScanMetrics(MetricRegistry registry, String tableName) {
- metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
+ metricGroup = registry.createTableMetricGroup(GROUP_NAME, tableName);
metricGroup.gauge(
LAST_SCAN_DURATION, () -> latestScan == null ? 0L :
latestScan.getDuration());
durationHistogram = metricGroup.histogram(SCAN_DURATION,
HISTOGRAM_WINDOW_SIZE);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
index d414383aa5..e4c676ab1e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
@@ -42,7 +42,7 @@ public class WriterBufferMetric {
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
MetricRegistry metricRegistry,
String tableName) {
- metricGroup = metricRegistry.tableMetricGroup(GROUP_NAME, tableName);
+ metricGroup = metricRegistry.createTableMetricGroup(GROUP_NAME,
tableName);
numWriters = new AtomicInteger(0);
metricGroup.gauge(NUM_WRITERS, numWriters::get);
metricGroup.gauge(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 1fece2596f..5c4e4a6a13 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -108,7 +108,7 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
@Override
public InnerTableRead newRead() {
RawFileSplitRead read = store().newRead();
- return new AbstractDataTableRead<InternalRow>(schema()) {
+ return new AbstractDataTableRead(schema()) {
@Override
protected InnerTableRead innerWithFilter(Predicate predicate) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index dddc8ca439..33c7b2c472 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -296,9 +296,9 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
}
@Override
- public Scan withMetricsRegistry(MetricRegistry metricRegistry) {
- mainScan.withMetricsRegistry(metricRegistry);
- fallbackScan.withMetricsRegistry(metricRegistry);
+ public Scan withMetricRegistry(MetricRegistry metricRegistry) {
+ mainScan.withMetricRegistry(metricRegistry);
+ fallbackScan.withMetricRegistry(metricRegistry);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index 8355575d8d..1544375569 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -44,5 +44,6 @@ public interface InnerTableCommit extends StreamTableCommit,
BatchTableCommit {
*/
InnerTableCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
+ @Override
InnerTableCommit withMetricRegistry(MetricRegistry registry);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
index 8845b7fa17..26f4ffd163 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.Table;
import java.util.List;
@@ -35,6 +36,9 @@ import java.util.List;
@Public
public interface TableCommit extends AutoCloseable {
+ /** Set {@link MetricRegistry} to table commit. */
+ TableCommit withMetricRegistry(MetricRegistry registry);
+
/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index 9c7369fb22..57892eff29 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -76,6 +76,6 @@ public interface TableWrite extends AutoCloseable {
*/
void compact(BinaryRow partition, int bucket, boolean fullCompaction)
throws Exception;
- /** With metrics to measure compaction. */
+ /** Set {@link MetricRegistry} to table write. */
TableWrite withMetricRegistry(MetricRegistry registry);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index 7f051325e8..da2ce8cb56 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -31,7 +31,7 @@ import java.io.IOException;
import java.util.Optional;
/** A {@link InnerTableRead} for data table. */
-public abstract class AbstractDataTableRead<T> implements InnerTableRead {
+public abstract class AbstractDataTableRead implements InnerTableRead {
private final DefaultValueAssigner defaultValueAssigner;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index c6de102581..9c103badad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -113,7 +113,7 @@ abstract class AbstractDataTableScan implements
DataTableScan {
return this;
}
- public AbstractDataTableScan withMetricsRegistry(MetricRegistry
metricsRegistry) {
+ public AbstractDataTableScan withMetricRegistry(MetricRegistry
metricsRegistry) {
snapshotReader.withMetricRegistry(metricsRegistry);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index a8f1890674..3a8ceeb353 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.RowType;
@@ -57,4 +58,9 @@ public interface InnerTableRead extends TableRead {
default TableRead executeFilter() {
return this;
}
+
+ @Override
+ default InnerTableRead withMetricRegistry(MetricRegistry registry) {
+ return this;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 1c7b153184..26168bba2a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -59,7 +59,8 @@ public interface InnerTableScan extends TableScan {
return this;
}
- default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
+ @Override
+ default InnerTableScan withMetricRegistry(MetricRegistry metricRegistry) {
// do nothing, should implement this if need
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index 7bfbe5fd3e..2c2b6c280f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -46,7 +46,7 @@ import java.util.function.Supplier;
/**
* An abstraction layer above {@link MergeFileSplitRead} to provide reading of
{@link InternalRow}.
*/
-public final class KeyValueTableRead extends AbstractDataTableRead<KeyValue> {
+public final class KeyValueTableRead extends AbstractDataTableRead {
private final List<SplitReadProvider> readProviders;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index 7b78d5aede..68d5a0f1f2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
@@ -38,6 +39,9 @@ import java.util.List;
@Public
public interface TableRead {
+ /** Set {@link MetricRegistry} to table read. */
+ TableRead withMetricRegistry(MetricRegistry registry);
+
TableRead executeFilter();
TableRead withIOManager(IOManager ioManager);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
index a5eae757ef..f3c667f19e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.Table;
import java.util.List;
@@ -34,6 +35,9 @@ import java.util.stream.Collectors;
@Public
public interface TableScan {
+ /** Set {@link MetricRegistry} to table scan. */
+ TableScan withMetricRegistry(MetricRegistry registry);
+
/** Plan splits, throws {@link EndOfScanException} if the scan is ended. */
Plan plan();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index eabb1a6120..66b55b712d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -450,8 +450,8 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public InnerTableScan withMetricsRegistry(MetricRegistry
metricsRegistry) {
- batchScan.withMetricsRegistry(metricsRegistry);
+ public InnerTableScan withMetricRegistry(MetricRegistry
metricsRegistry) {
+ batchScan.withMetricRegistry(metricsRegistry);
return this;
}
@@ -565,8 +565,8 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public StreamDataTableScan withMetricsRegistry(MetricRegistry
metricsRegistry) {
- streamScan.withMetricsRegistry(metricsRegistry);
+ public StreamDataTableScan withMetricRegistry(MetricRegistry
metricsRegistry) {
+ streamScan.withMetricRegistry(metricsRegistry);
return this;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java
b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java
index 90a063e324..6280ff3ccf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java
@@ -30,7 +30,7 @@ public class MetricGroupTest {
@Test
public void testGroupRegisterMetrics() {
TestMetricRegistry registry = new TestMetricRegistry();
- MetricGroup group = registry.tableMetricGroup("commit", "myTable");
+ MetricGroup group = registry.createTableMetricGroup("commit",
"myTable");
// these will fail is the registration is propagated
group.counter("testcounter");
@@ -51,7 +51,7 @@ public class MetricGroupTest {
public void testTolerateMetricNameCollisions() {
final String name = "abctestname";
TestMetricRegistry registry = new TestMetricRegistry();
- MetricGroup group = registry.tableMetricGroup("commit", "myTable");
+ MetricGroup group = registry.createTableMetricGroup("commit",
"myTable");
Counter counter = group.counter(name);
// return the old one with the metric name collision
diff --git
a/paimon-core/src/test/java/org/apache/paimon/metrics/TestMetricRegistry.java
b/paimon-core/src/test/java/org/apache/paimon/metrics/TestMetricRegistry.java
index 4b2870041c..88b3e6e2ff 100644
---
a/paimon-core/src/test/java/org/apache/paimon/metrics/TestMetricRegistry.java
+++
b/paimon-core/src/test/java/org/apache/paimon/metrics/TestMetricRegistry.java
@@ -21,10 +21,10 @@ package org.apache.paimon.metrics;
import java.util.Map;
/** Implementation of {@link MetricRegistry} for tests. */
-public class TestMetricRegistry extends MetricRegistry {
+public class TestMetricRegistry implements MetricRegistry {
@Override
- protected MetricGroup createMetricGroup(String groupName, Map<String,
String> variables) {
+ public MetricGroup createMetricGroup(String groupName, Map<String, String>
variables) {
return new MetricGroupImpl(groupName, variables);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index 2a140adc32..bbc0bcda17 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.lookup;
-import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.MergeFileSplitRead;
@@ -38,7 +37,7 @@ import java.io.IOException;
import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
/** An {@link InnerTableRead} that reads the data changed before and after
compaction. */
-public class LookupCompactDiffRead extends AbstractDataTableRead<KeyValue> {
+public class LookupCompactDiffRead extends AbstractDataTableRead {
private final SplitRead<InternalRow> fullPhaseMergeRead;
private final SplitRead<InternalRow> incrementalDiffRead;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
index 8fc233dd0f..9771fd45a6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
@@ -24,7 +24,7 @@ import org.apache.paimon.metrics.MetricRegistry;
import java.util.Map;
/** {@link MetricRegistry} to create {@link FlinkMetricGroup}. */
-public class FlinkMetricRegistry extends MetricRegistry {
+public class FlinkMetricRegistry implements MetricRegistry {
private final org.apache.flink.metrics.MetricGroup group;
@@ -33,7 +33,7 @@ public class FlinkMetricRegistry extends MetricRegistry {
}
@Override
- protected MetricGroup createMetricGroup(String groupName, Map<String,
String> variables) {
+ public MetricGroup createMetricGroup(String groupName, Map<String, String>
variables) {
return new FlinkMetricGroup(group, groupName, variables);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index cc33ef167c..db39f90455 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -82,7 +82,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
StreamTableScan scan = readBuilder.newStreamScan();
if (metricGroup(context) != null) {
((StreamDataTableScan) scan)
- .withMetricsRegistry(new
FlinkMetricRegistry(context.metricGroup()));
+ .withMetricRegistry(new
FlinkMetricRegistry(context.metricGroup()));
}
scan.restore(nextSnapshotId);
return buildEnumerator(context, splits, nextSnapshotId, scan);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
index 5fcfc9b379..42da33cfa7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
@@ -20,8 +20,10 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.NestedProjectedRowData;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
@@ -29,6 +31,7 @@ import
org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
@@ -59,11 +62,14 @@ public abstract class FlinkSource
public SourceReader<RowData, FileStoreSourceSplit>
createReader(SourceReaderContext context) {
IOManager ioManager =
IOManager.create(splitPaths(context.getConfiguration().get(CoreOptions.TMP_DIRS)));
+ SourceReaderMetricGroup metricGroup = context.metricGroup();
FileStoreSourceReaderMetrics sourceReaderMetrics =
- new FileStoreSourceReaderMetrics(context.metricGroup());
+ new FileStoreSourceReaderMetrics(metricGroup);
+ TableRead tableRead =
+ readBuilder.newRead().withMetricRegistry(new
FlinkMetricRegistry(metricGroup));
return new FileStoreSourceReader(
context,
- readBuilder.newRead(),
+ tableRead,
sourceReaderMetrics,
ioManager,
limit,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
index bc361cdbf3..afdcae2fff 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
@@ -132,7 +132,7 @@ public class LogHybridSourceFactory
// register scan metrics
if (context.metricGroup() != null) {
((StreamDataTableScan) scan)
- .withMetricsRegistry(new
FlinkMetricRegistry(context.metricGroup()));
+ .withMetricRegistry(new
FlinkMetricRegistry(context.metricGroup()));
}
splits = splitGenerator.createSplits(scan.plan());
Long nextSnapshotId = scan.checkpoint();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 624f543481..01aa464d7b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -93,7 +93,7 @@ public class StaticFileStoreSource extends FlinkSource {
// register scan metrics
if (context.metricGroup() != null) {
((InnerTableScan) scan)
- .withMetricsRegistry(new
FlinkMetricRegistry(context.metricGroup()));
+ .withMetricRegistry(new
FlinkMetricRegistry(context.metricGroup()));
}
return splitGenerator.createSplits(scan.plan());
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index aca2d43e16..6f05e5dd7d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
@@ -895,6 +896,11 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
this.nextSnapshotIdForConsumer = null;
}
+ @Override
+ public TableScan withMetricRegistry(MetricRegistry registry) {
+ return this;
+ }
+
@Override
public Plan plan() {
Map.Entry<Long, Plan> planEntry = results.pollFirstEntry();
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 5e790cb301..74741f5364 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -68,7 +68,7 @@ abstract class PaimonBaseScan(
readBuilder
.newScan()
.asInstanceOf[InnerTableScan]
- .withMetricsRegistry(paimonMetricsRegistry)
+ .withMetricRegistry(paimonMetricsRegistry)
.plan()
.splits()
.asScala
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
index 37ba98a4a6..3d858a623f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
@@ -32,7 +32,7 @@ case class SparkMetricRegistry() extends MetricRegistry {
private val metricGroups = mutable.Map.empty[String, MetricGroup]
- override protected def createMetricGroup(
+ override def createMetricGroup(
groupName: String,
variables: util.Map[String, String]): MetricGroup = {
val metricGroup = new MetricGroupImpl(groupName, variables)