This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e0f754860 [core] Try to avoid NPE for 
SplitEnumeratorContext.metricGroup
e0f754860 is described below

commit e0f7548600221a926c87b1d66cb21e7b9bb11838
Author: Jingsong <[email protected]>
AuthorDate: Tue Dec 19 19:20:45 2023 +0800

    [core] Try to avoid NPE for SplitEnumeratorContext.metricGroup
---
 .../paimon/flink/source/ContinuousFileStoreSource.java      | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 9691a8df5..6b5b5f4ab 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -28,6 +28,7 @@ import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
 
 import javax.annotation.Nullable;
 
@@ -75,7 +76,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
             splits = checkpoint.splits();
         }
         StreamTableScan scan = readBuilder.newStreamScan();
-        if (context.metricGroup() != null) {
+        if (metricGroup(context) != null) {
             ((InnerStreamTableScan) scan)
                     .withMetricsRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
         }
@@ -83,6 +84,16 @@ public class ContinuousFileStoreSource extends FlinkSource {
         return buildEnumerator(context, splits, nextSnapshotId, scan);
     }
 
+    @Nullable
+    private SplitEnumeratorMetricGroup metricGroup(SplitEnumeratorContext<?> 
context) {
+        try {
+            return context.metricGroup();
+        } catch (NullPointerException ignore) {
+            // ignore NPE for some Flink versions
+            return null;
+        }
+    }
+
     protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
buildEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             Collection<FileStoreSourceSplit> splits,

Reply via email to