This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c02879020fc3911d22f986be60d5b58de08684f9 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Fri Dec 12 10:12:29 2025 +0100 [hotfix] Pull legacyUidsEnabled as an abstract method to CommonExecTableSourceScan --- .../plan/nodes/exec/batch/BatchExecTableSourceScan.java | 7 ++++++- .../plan/nodes/exec/common/CommonExecTableSourceScan.java | 11 +++++++---- .../plan/nodes/exec/stream/StreamExecTableSourceScan.java | 11 ++--------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java index c14d9988bf6..6dda7193224 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java @@ -124,7 +124,7 @@ public class BatchExecTableSourceScan extends CommonExecTableSourceScan protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { final Transformation<RowData> transformation = - super.createTransformation(planner, config, false); + super.translateToPlanInternal(planner, config); // the boundedness has been checked via the runtime provider already, so we can safely // declare all legacy transformations as bounded to make the stream graph generator happy ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation); @@ -164,6 +164,11 @@ public class BatchExecTableSourceScan extends CommonExecTableSourceScan return env.addSource(function, operatorName, outputTypeInfo).getTransformation(); } + @Override + protected final boolean legacyUidsEnabled() { + return false; + } + public BatchExecTableSourceScan copyAndRemoveInputs() { BatchExecTableSourceScan tableSourceScan = new BatchExecTableSourceScan( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java index 460530c49cd..1d7526aba77 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java @@ -112,8 +112,9 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> return tableSourceSpec; } - protected Transformation<RowData> createTransformation( - PlannerBase planner, ExecNodeConfig config, boolean legacyUidsEnabled) { + @Override + protected Transformation<RowData> translateToPlanInternal( + PlannerBase planner, ExecNodeConfig config) { final Transformation<RowData> sourceTransform; final StreamExecutionEnvironment env = planner.getExecEnv(); final TransformationMetadata metadata = @@ -186,7 +187,7 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> ((DataStreamScanProvider) provider) .produceDataStream(createProviderContext(metadata, config), env) .getTransformation(); - if (legacyUidsEnabled) { + if (legacyUidsEnabled()) { metadata.fill(sourceTransform); } sourceTransform.setOutputType(outputTypeInfo); @@ -194,7 +195,7 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> sourceTransform = ((TransformationScanProvider) provider) .createTransformation(createProviderContext(metadata, config)); - if (legacyUidsEnabled) { + if (legacyUidsEnabled()) { metadata.fill(sourceTransform); } sourceTransform.setOutputType(outputTypeInfo); @@ -369,4 +370,6 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> InputFormat<RowData, ?> inputFormat, InternalTypeInfo<RowData> outputTypeInfo, String operatorName); + + protected abstract boolean legacyUidsEnabled(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java index 96d21f12022..412b9a42bb6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java @@ -25,9 +25,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan; @@ -94,12 +92,6 @@ public class StreamExecTableSourceScan extends CommonExecTableSourceScan description); } - @Override - protected Transformation<RowData> translateToPlanInternal( - PlannerBase planner, ExecNodeConfig config) { - return createTransformation(planner, config, legacyUidsEnabled()); - } - @Override public Transformation<RowData> createInputFormatTransformation( StreamExecutionEnvironment env, @@ -111,7 +103,8 @@ public class StreamExecTableSourceScan extends CommonExecTableSourceScan return env.createInput(inputFormat, outputTypeInfo).name(operatorName).getTransformation(); } - private boolean legacyUidsEnabled() { + @Override + protected final boolean legacyUidsEnabled() { return getVersion() == 1; } }
