This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 617244fdd9f [FLINK-38738][table] Expose more context in table 
connectors to correct UID
617244fdd9f is described below

commit 617244fdd9fa6edc2907020f2b57be16f6cc7cb3
Author: Timo Walther <[email protected]>
AuthorDate: Fri Dec 19 16:50:05 2025 +0100

    [FLINK-38738][table] Expose more context in table connectors to correct UID
    
    This closes #27352.
---
 .../org/apache/flink/table/connector/ProviderContext.java  | 10 ++++++++++
 .../flink/table/planner/plan/nodes/exec/ExecNode.java      | 14 ++++++++++++++
 .../flink/table/planner/plan/nodes/exec/ExecNodeBase.java  | 10 ++++++++++
 .../planner/plan/nodes/exec/common/CommonExecSink.java     |  5 +++++
 .../planner/plan/nodes/exec/TestingBatchExecNode.java      |  5 +++++
 .../table/planner/plan/nodes/exec/TransformationsTest.java | 11 ++++++++---
 6 files changed, 52 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ProviderContext.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ProviderContext.java
index 3f806d2d73f..dddb5e06db9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ProviderContext.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ProviderContext.java
@@ -61,6 +61,16 @@ public interface ProviderContext {
      */
     Optional<String> generateUid(String name);
 
+    /**
+     * Returns the framework's node type in which this connector is embedded.
+     *
+     * <p>In other words: It returns the ExecNode's name and version as 
contained in the compiled
+     * plan. For example, "stream-exec-table-source-scan_2" or 
"stream-exec-sink_1".
+     */
+    default String getContainerNodeType() {
+        return "";
+    }
+
     /**
      * Returns the display name provided by the framework to label the 
connector.
      *
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
index 411b294aafe..321e3286607 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
@@ -65,12 +65,26 @@ public interface ExecNode<T> extends ExecNodeTranslator<T>, 
FusionCodegenExecNod
     @JsonProperty(value = FIELD_NAME_ID, index = 0)
     int getId();
 
+    /**
+     * The node's type as contained in {@link CompiledPlan} (e.g. 
"stream-exec-table-source-scan_2"
+     * consisting of name and version).
+     *
+     * <p>A new type including its version can be added by declaring a {@link 
ExecNodeMetadata}
+     * annotation.
+     *
+     * @see ExecNodeContext#getTypeAsString()
+     */
+    @JsonIgnore
+    String getTypeAsString();
+
     /**
      * The version of the node.
      *
      * <p>A new version can be added by declaring a {@link ExecNodeMetadata} 
annotation, potentially
      * by copying the old annotation. You can use this method to get the 
current compiled version
      * and execute version-specific logic accordingly.
+     *
+     * @see ExecNodeContext#getVersion()
      */
     @JsonIgnore
     int getVersion();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
index 7a9d940004c..92d9f8fbd95 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
@@ -131,6 +131,11 @@ public abstract class ExecNodeBase<T> implements 
ExecNode<T> {
         return context.getId();
     }
 
+    @Override
+    public final String getTypeAsString() {
+        return context.getTypeAsString();
+    }
+
     @Override
     public final int getVersion() {
         return context.getVersion();
@@ -345,6 +350,11 @@ public abstract class ExecNodeBase<T> implements 
ExecNode<T> {
                 return Optional.empty();
             }
 
+            @Override
+            public String getContainerNodeType() {
+                return getTypeAsString();
+            }
+
             @Override
             public String getName() {
                 return metadata.getName();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index ad333589232..0fc273ca684 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -431,6 +431,11 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                                 return providerContext.generateUid(name);
                             }
 
+                            @Override
+                            public String getContainerNodeType() {
+                                return providerContext.getContainerNodeType();
+                            }
+
                             @Override
                             public String getName() {
                                 return providerContext.getName();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
index 348da51c1d9..13d887633ee 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
@@ -66,6 +66,11 @@ public class TestingBatchExecNode implements 
BatchExecNode<RowData> {
         return 0;
     }
 
+    @Override
+    public String getTypeAsString() {
+        return "";
+    }
+
     @Override
     public int getVersion() {
         return 0;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
index 49609c1d439..4b970d83b30 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -413,7 +413,7 @@ class TransformationsTest {
     }
 
     private static CompiledPlan 
planFromFlink2_2MultiTransformSource(TableEnvironment env) {
-        createMultiTransformSource(env);
+        createMultiTransformSource(env, "stream-exec-table-source-scan_1");
         // plan content is compiled from
         // planFromCurrentFlinkMultiTransformSource() using Flink release-2.2
         return env.loadPlan(
@@ -421,13 +421,14 @@ class TransformationsTest {
     }
 
     private static CompiledPlan 
planFromCurrentFlinkMultiTransformSource(TableEnvironment env) {
-        createMultiTransformSource(env);
+        createMultiTransformSource(env, "stream-exec-table-source-scan_2");
         return env.from("T")
                 .insertInto(TableDescriptor.forConnector("blackhole").build())
                 .compilePlan();
     }
 
-    private static void createMultiTransformSource(TableEnvironment env) {
+    private static void createMultiTransformSource(
+            TableEnvironment env, String expectedSourceExecNode) {
         final DataStreamScanProvider scanProvider =
                 new DataStreamScanProvider() {
                     @Override
@@ -438,6 +439,10 @@ class TransformationsTest {
                     @Override
                     public DataStream<RowData> produceDataStream(
                             ProviderContext providerContext, 
StreamExecutionEnvironment execEnv) {
+
+                        assertThat(providerContext.getContainerNodeType())
+                                .isEqualTo(expectedSourceExecNode);
+
                         // UID 1
                         final SingleOutputStreamOperator<Integer> ints = 
execEnv.fromData(1, 2, 3);
                         
providerContext.generateUid("my-source").ifPresent(ints::uid);

Reply via email to