This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 617244fdd9f [FLINK-38738][table] Expose more context in table
connectors to correct UID
617244fdd9f is described below
commit 617244fdd9fa6edc2907020f2b57be16f6cc7cb3
Author: Timo Walther <[email protected]>
AuthorDate: Fri Dec 19 16:50:05 2025 +0100
[FLINK-38738][table] Expose more context in table connectors to correct UID
This closes #27352.
---
.../org/apache/flink/table/connector/ProviderContext.java | 10 ++++++++++
.../flink/table/planner/plan/nodes/exec/ExecNode.java | 14 ++++++++++++++
.../flink/table/planner/plan/nodes/exec/ExecNodeBase.java | 10 ++++++++++
.../planner/plan/nodes/exec/common/CommonExecSink.java | 5 +++++
.../planner/plan/nodes/exec/TestingBatchExecNode.java | 5 +++++
.../table/planner/plan/nodes/exec/TransformationsTest.java | 11 ++++++++---
6 files changed, 52 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ProviderContext.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ProviderContext.java
index 3f806d2d73f..dddb5e06db9 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ProviderContext.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ProviderContext.java
@@ -61,6 +61,16 @@ public interface ProviderContext {
*/
Optional<String> generateUid(String name);
+ /**
+ * Returns the framework's node type in which this connector is embedded.
+ *
+ * <p>In other words: It returns the ExecNode's name and version as
contained in the compiled
+ * plan. For example, "stream-exec-table-source-scan_2" or
"stream-exec-sink_1".
+ */
+ default String getContainerNodeType() {
+ return "";
+ }
+
/**
* Returns the display name provided by the framework to label the
connector.
*
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
index 411b294aafe..321e3286607 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
@@ -65,12 +65,26 @@ public interface ExecNode<T> extends ExecNodeTranslator<T>,
FusionCodegenExecNod
@JsonProperty(value = FIELD_NAME_ID, index = 0)
int getId();
+ /**
+ * The node's type as contained in {@link CompiledPlan} (e.g.
"stream-exec-table-source-scan_2"
+ * consisting of name and version).
+ *
+ * <p>A new type including its version can be added by declaring a {@link
ExecNodeMetadata}
+ * annotation.
+ *
+ * @see ExecNodeContext#getTypeAsString()
+ */
+ @JsonIgnore
+ String getTypeAsString();
+
/**
* The version of the node.
*
* <p>A new version can be added by declaring a {@link ExecNodeMetadata}
annotation, potentially
* by copying the old annotation. You can use this method to get the
current compiled version
* and execute version-specific logic accordingly.
+ *
+ * @see ExecNodeContext#getVersion()
*/
@JsonIgnore
int getVersion();
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
index 7a9d940004c..92d9f8fbd95 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
@@ -131,6 +131,11 @@ public abstract class ExecNodeBase<T> implements
ExecNode<T> {
return context.getId();
}
+ @Override
+ public final String getTypeAsString() {
+ return context.getTypeAsString();
+ }
+
@Override
public final int getVersion() {
return context.getVersion();
@@ -345,6 +350,11 @@ public abstract class ExecNodeBase<T> implements
ExecNode<T> {
return Optional.empty();
}
+ @Override
+ public String getContainerNodeType() {
+ return getTypeAsString();
+ }
+
@Override
public String getName() {
return metadata.getName();
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index ad333589232..0fc273ca684 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -431,6 +431,11 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
return providerContext.generateUid(name);
}
+ @Override
+ public String getContainerNodeType() {
+ return providerContext.getContainerNodeType();
+ }
+
@Override
public String getName() {
return providerContext.getName();
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
index 348da51c1d9..13d887633ee 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
@@ -66,6 +66,11 @@ public class TestingBatchExecNode implements
BatchExecNode<RowData> {
return 0;
}
+ @Override
+ public String getTypeAsString() {
+ return "";
+ }
+
@Override
public int getVersion() {
return 0;
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
index 49609c1d439..4b970d83b30 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -413,7 +413,7 @@ class TransformationsTest {
}
private static CompiledPlan
planFromFlink2_2MultiTransformSource(TableEnvironment env) {
- createMultiTransformSource(env);
+ createMultiTransformSource(env, "stream-exec-table-source-scan_1");
// plan content is compiled from
// planFromCurrentFlinkMultiTransformSource() using Flink release-2.2
return env.loadPlan(
@@ -421,13 +421,14 @@ class TransformationsTest {
}
private static CompiledPlan
planFromCurrentFlinkMultiTransformSource(TableEnvironment env) {
- createMultiTransformSource(env);
+ createMultiTransformSource(env, "stream-exec-table-source-scan_2");
return env.from("T")
.insertInto(TableDescriptor.forConnector("blackhole").build())
.compilePlan();
}
- private static void createMultiTransformSource(TableEnvironment env) {
+ private static void createMultiTransformSource(
+ TableEnvironment env, String expectedSourceExecNode) {
final DataStreamScanProvider scanProvider =
new DataStreamScanProvider() {
@Override
@@ -438,6 +439,10 @@ class TransformationsTest {
@Override
public DataStream<RowData> produceDataStream(
ProviderContext providerContext,
StreamExecutionEnvironment execEnv) {
+
+ assertThat(providerContext.getContainerNodeType())
+ .isEqualTo(expectedSourceExecNode);
+
// UID 1
final SingleOutputStreamOperator<Integer> ints =
execEnv.fromData(1, 2, 3);
providerContext.generateUid("my-source").ifPresent(ints::uid);