This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e0f754860 [core] Try to avoid NPE for
SplitEnumeratorContext.metricGroup
e0f754860 is described below
commit e0f7548600221a926c87b1d66cb21e7b9bb11838
Author: Jingsong <[email protected]>
AuthorDate: Tue Dec 19 19:20:45 2023 +0800
[core] Try to avoid NPE for SplitEnumeratorContext.metricGroup
---
.../paimon/flink/source/ContinuousFileStoreSource.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 9691a8df5..6b5b5f4ab 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -28,6 +28,7 @@ import org.apache.paimon.table.source.StreamTableScan;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import javax.annotation.Nullable;
@@ -75,7 +76,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
splits = checkpoint.splits();
}
StreamTableScan scan = readBuilder.newStreamScan();
- if (context.metricGroup() != null) {
+ if (metricGroup(context) != null) {
((InnerStreamTableScan) scan)
.withMetricsRegistry(new
FlinkMetricRegistry(context.metricGroup()));
}
@@ -83,6 +84,16 @@ public class ContinuousFileStoreSource extends FlinkSource {
return buildEnumerator(context, splits, nextSnapshotId, scan);
}
+ @Nullable
+ private SplitEnumeratorMetricGroup metricGroup(SplitEnumeratorContext<?>
context) {
+ try {
+ return context.metricGroup();
+ } catch (NullPointerException ignore) {
+ // ignore NPE for some Flink versions
+ return null;
+ }
+ }
+
protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint>
buildEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> splits,