This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 881b2bf046e510b1b6dddddf8c15af45926397f1
Author: Timo Walther <twal...@apache.org>
AuthorDate: Thu Aug 11 11:18:46 2022 +0200

    [FLINK-28861][table] Fix bug in UID format for future migrations and make 
it configurable
    
    Before this commit, the UID format was not future-proof for migrations. The 
ExecNode version
    should not be in the UID, otherwise, operator migration won't be possible 
once plan migration
    is executed. See the FLIP-190 example that drops a version in the plan, 
once operator migration
    has been performed. Given that the plan feature is marked as @Experimental, 
this change should
    still be possible without providing backwards compatibility.
    
    However, the config option table.exec.uid.format allows for restoring the 
old format and solves
    other UID related issues on the way.
    
    This closes #20555.
---
 .../generated/execution_config_configuration.html  |  6 ++
 .../table/api/config/ExecutionConfigOptions.java   | 13 +++++
 .../planner/plan/nodes/exec/ExecNodeBase.java      |  9 +--
 .../planner/plan/nodes/exec/ExecNodeContext.java   | 20 ++++++-
 .../plan/nodes/exec/common/CommonExecSink.java     |  2 +-
 .../exec/common/CommonExecTableSourceScan.java     |  2 +-
 .../nodes/exec/stream/StreamExecIntervalJoin.java  | 11 ++--
 .../plan/nodes/exec/TransformationsTest.java       | 66 ++++++++++++++++++++++
 .../flink/table/planner/utils/JsonTestUtils.java   | 20 +++++++
 9 files changed, 136 insertions(+), 13 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index d6292edc1b4..d837fd4ab66 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -160,6 +160,12 @@ By default no operator is disabled.</td>
             <td>Duration</td>
             <td>Specifies a minimum time interval for how long idle state 
(i.e. state which was not updated), will be retained. State will never be 
cleared until it was idle for less than the minimum time, and will be cleared 
at some time after it was idle. Default is never clean-up the state. NOTE: 
Cleaning up state requires additional overhead for bookkeeping. Default value 
is 0, which means that it will never clean up state.</td>
         </tr>
+        <tr>
+            <td><h5>table.exec.uid.format</h5><br> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: 
break-word;">"&lt;id&gt;_&lt;transformation&gt;"</td>
+            <td>String</td>
+            <td>Defines the format pattern for generating the UID of an 
ExecNode streaming transformation. The pattern can be defined globally or 
per-ExecNode in the compiled plan. Supported arguments are: &lt;id&gt; (from 
static counter), &lt;type&gt; (e.g. 'stream-exec-sink'), &lt;version&gt;, and 
&lt;transformation&gt; (e.g. 'constraint-validator' for a sink). In Flink 
1.15.x the pattern was wrongly defined as 
'&lt;id&gt;_&lt;type&gt;_&lt;version&gt;_&lt;transformation&gt;' which woul 
[...]
+        </tr>
         <tr>
             <td><h5>table.exec.uid.generation</h5><br> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">PLAN_ONLY</td>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 2f449dfcf6b..c14546c192f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -529,6 +529,19 @@ public class ExecutionConfigOptions {
                                                     + "affecting the stable 
UIDs.")
                                     .build());
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<String> TABLE_EXEC_UID_FORMAT =
+            key("table.exec.uid.format")
+                    .stringType()
+                    .defaultValue("<id>_<transformation>")
+                    .withDescription(
+                            "Defines the format pattern for generating the UID 
of an ExecNode streaming transformation. "
+                                    + "The pattern can be defined globally or 
per-ExecNode in the compiled plan. "
+                                    + "Supported arguments are: <id> (from 
static counter), <type> (e.g. 'stream-exec-sink'), "
+                                    + "<version>, and <transformation> (e.g. 
'constraint-validator' for a sink). "
+                                    + "In Flink 1.15.x the pattern was wrongly 
defined as '<id>_<type>_<version>_<transformation>' "
+                                    + "which would prevent migrations in the 
future.");
+
     // 
------------------------------------------------------------------------------------------
     // Enum option types
     // 
------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
index 0e949aa666b..2c7a33042a0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
@@ -207,8 +207,8 @@ public abstract class ExecNodeBase<T> implements 
ExecNode<T> {
         return getClass().getSimpleName().replace("StreamExec", 
"").replace("BatchExec", "");
     }
 
-    protected String createTransformationUid(String operatorName) {
-        return context.generateUid(operatorName);
+    protected String createTransformationUid(String operatorName, 
ExecNodeConfig config) {
+        return context.generateUid(operatorName, config);
     }
 
     protected String createTransformationName(ReadableConfig config) {
@@ -226,7 +226,7 @@ public abstract class ExecNodeBase<T> implements 
ExecNode<T> {
                     createTransformationName(config), 
createTransformationDescription(config));
         } else {
             return new TransformationMetadata(
-                    createTransformationUid(operatorName),
+                    createTransformationUid(operatorName, config),
                     createTransformationName(config),
                     createTransformationDescription(config));
         }
@@ -239,7 +239,8 @@ public abstract class ExecNodeBase<T> implements 
ExecNode<T> {
         if (ExecNodeMetadataUtil.isUnsupported(this.getClass()) || 
!config.shouldSetUid()) {
             return new TransformationMetadata(name, desc);
         } else {
-            return new 
TransformationMetadata(createTransformationUid(operatorName), name, desc);
+            return new TransformationMetadata(
+                    createTransformationUid(operatorName, config), name, desc);
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
index 0abb680b136..cc3436916c6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
 import org.apache.flink.table.types.logical.LogicalType;
 
@@ -30,6 +31,8 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext;
 
+import org.apache.commons.lang3.StringUtils;
+
 import javax.annotation.Nullable;
 
 import java.util.List;
@@ -122,7 +125,7 @@ public final class ExecNodeContext {
     }
 
     /** Returns a new {@code uid} for transformations. */
-    public String generateUid(String transformationName) {
+    public String generateUid(String transformationName, ExecNodeConfig 
config) {
         if (!transformationNamePattern.matcher(transformationName).matches()) {
             throw new TableException(
                     "Invalid transformation name '"
@@ -130,7 +133,20 @@ public final class ExecNodeContext {
                             + "'. "
                             + "This is a bug, please file an issue.");
         }
-        return String.format("%s_%s_%s", getId(), getTypeAsString(), 
transformationName);
+        final String uidPattern = 
config.get(ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT);
+        // Note: name and version are not included in the UID by default as 
they would prevent
+        // migration.
+        // No version because: An operator can change its state layout and 
bump up the ExecNode
+        // version, in this case the UID should still be able to map state 
even after plan
+        // migration to the new version.
+        // No name because: We might fuse operators in the future, and a new 
operator might
+        // subscribe to multiple old UIDs.
+        return StringUtils.replaceEach(
+                uidPattern,
+                new String[] {"<id>", "<type>", "<version>", 
"<transformation>"},
+                new String[] {
+                    String.valueOf(id), name, String.valueOf(version), 
transformationName
+                });
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 5831bd3ed76..71f8184df2c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -532,7 +532,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
     private ProviderContext createProviderContext(ExecNodeConfig config) {
         return name -> {
             if (this instanceof StreamExecNode && config.shouldSetUid()) {
-                return Optional.of(createTransformationUid(name));
+                return Optional.of(createTransformationUid(name, config));
             }
             return Optional.empty();
         };
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
index baebd1af435..a008f881a56 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
@@ -158,7 +158,7 @@ public abstract class CommonExecTableSourceScan extends 
ExecNodeBase<RowData>
     private ProviderContext createProviderContext(ExecNodeConfig config) {
         return name -> {
             if (this instanceof StreamExecNode && config.shouldSetUid()) {
-                return Optional.of(createTransformationUid(name));
+                return Optional.of(createTransformationUid(name, config));
             }
             return Optional.empty();
         };
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
index 347203641e6..70288dd1ce6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
@@ -236,7 +236,7 @@ public class StreamExecIntervalJoin extends 
ExecNodeBase<RowData>
             int leftArity,
             int rightArity,
             InternalTypeInfo<RowData> returnTypeInfo,
-            ReadableConfig config) {
+            ExecNodeConfig config) {
         boolean shouldCreateUid =
                 
config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS);
 
@@ -260,7 +260,7 @@ public class StreamExecIntervalJoin extends 
ExecNodeBase<RowData>
                         returnTypeInfo,
                         leftParallelism);
         if (shouldCreateUid) {
-            
filterAllLeftStream.setUid(createTransformationUid(FILTER_LEFT_TRANSFORMATION));
+            
filterAllLeftStream.setUid(createTransformationUid(FILTER_LEFT_TRANSFORMATION, 
config));
         }
         filterAllLeftStream.setDescription(
                 createFormattedTransformationDescription(
@@ -277,7 +277,8 @@ public class StreamExecIntervalJoin extends 
ExecNodeBase<RowData>
                         returnTypeInfo,
                         rightParallelism);
         if (shouldCreateUid) {
-            
filterAllRightStream.setUid(createTransformationUid(FILTER_RIGHT_TRANSFORMATION));
+            filterAllRightStream.setUid(
+                    createTransformationUid(FILTER_RIGHT_TRANSFORMATION, 
config));
         }
         filterAllRightStream.setDescription(
                 createFormattedTransformationDescription(
@@ -294,7 +295,7 @@ public class StreamExecIntervalJoin extends 
ExecNodeBase<RowData>
                         returnTypeInfo,
                         leftParallelism);
         if (shouldCreateUid) {
-            
padLeftStream.setUid(createTransformationUid(PAD_LEFT_TRANSFORMATION));
+            
padLeftStream.setUid(createTransformationUid(PAD_LEFT_TRANSFORMATION, config));
         }
         padLeftStream.setDescription(
                 createFormattedTransformationDescription("pad left input 
transformation", config));
@@ -310,7 +311,7 @@ public class StreamExecIntervalJoin extends 
ExecNodeBase<RowData>
                         returnTypeInfo,
                         rightParallelism);
         if (shouldCreateUid) {
-            
padRightStream.setUid(createTransformationUid(PAD_RIGHT_TRANSFORMATION));
+            
padRightStream.setUid(createTransformationUid(PAD_RIGHT_TRANSFORMATION, 
config));
         }
         padRightStream.setDescription(
                 createFormattedTransformationDescription("pad right input 
transformation", config));
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
index 20cf5080f90..4fa88b2345b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -31,19 +31,26 @@ import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.CompiledPlanUtils;
+import org.apache.flink.table.planner.utils.JsonTestUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
 import org.junit.jupiter.api.parallel.ExecutionMode;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT;
 import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION;
 import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.ALWAYS;
 import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.DISABLED;
@@ -192,10 +199,69 @@ class TransformationsTest {
         }
     }
 
+    @Test
+    public void testUidDefaults() throws IOException {
+        checkUidModification(
+                config -> {}, json -> {}, "\\d+_sink", 
"\\d+_constraint-validator", "\\d+_values");
+    }
+
+    @Test
+    public void testUidFlink1_15() throws IOException {
+        checkUidModification(
+                config ->
+                        config.set(TABLE_EXEC_UID_FORMAT, 
"<id>_<type>_<version>_<transformation>"),
+                json -> {},
+                "\\d+_stream-exec-sink_1_sink",
+                "\\d+_stream-exec-sink_1_constraint-validator",
+                "\\d+_stream-exec-values_1_values");
+    }
+
+    @Test
+    public void testPerNodeCustomUid() throws IOException {
+        checkUidModification(
+                config -> {},
+                json ->
+                        JsonTestUtils.setExecNodeConfig(
+                                json,
+                                "stream-exec-sink_1",
+                                TABLE_EXEC_UID_FORMAT.key(),
+                                "my_custom_<transformation>_<id>"),
+                "my_custom_sink_\\d+",
+                "my_custom_constraint-validator_\\d+",
+                "\\d+_values");
+    }
+
+    private static void checkUidModification(
+            Consumer<TableConfig> configModifier,
+            Consumer<JsonNode> jsonModifier,
+            String... expectedUidPatterns)
+            throws IOException {
+        final TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        configModifier.accept(env.getConfig());
+        final String plan = minimalPlan(env).asJsonString();
+        final JsonNode json = JsonTestUtils.readFromString(plan);
+        jsonModifier.accept(json);
+        final List<String> planUids =
+                CompiledPlanUtils.toTransformations(
+                                env, 
env.loadPlan(PlanReference.fromJsonString(json.toString())))
+                        .get(0).getTransitivePredecessors().stream()
+                        .map(Transformation::getUid)
+                        .collect(Collectors.toList());
+        assertThat(planUids).hasSize(expectedUidPatterns.length);
+        IntStream.range(0, expectedUidPatterns.length)
+                .forEach(i -> 
assertThat(planUids.get(i)).matches(expectedUidPatterns[i]));
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Helper methods
     // 
--------------------------------------------------------------------------------------------
 
+    private static CompiledPlan minimalPlan(TableEnvironment env) {
+        return env.fromValues(1, 2, 3)
+                .insertInto(TableDescriptor.forConnector("blackhole").build())
+                .compilePlan();
+    }
+
     private static LegacySourceTransformation<?> toLegacySourceTransformation(
             StreamTableEnvironment env, Table table) {
         Transformation<?> transform = 
env.toChangelogStream(table).getTransformation();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
index 6aa9f96c7d0..c4595a5ccd3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
@@ -39,8 +39,28 @@ public final class JsonTestUtils {
         return 
OBJECT_MAPPER_INSTANCE.readTree(JsonTestUtils.class.getResource(path));
     }
 
+    public static JsonNode readFromString(String path) throws IOException {
+        return OBJECT_MAPPER_INSTANCE.readTree(path);
+    }
+
     public static JsonNode setFlinkVersion(JsonNode target, FlinkVersion 
flinkVersion) {
         return ((ObjectNode) target)
                 .set("flinkVersion", 
OBJECT_MAPPER_INSTANCE.valueToTree(flinkVersion.toString()));
     }
+
+    public static JsonNode setExecNodeConfig(
+            JsonNode target, String type, String key, String value) {
+        target.get("nodes")
+                .elements()
+                .forEachRemaining(
+                        n -> {
+                            if (n.get("type").asText().equals(type)) {
+                                final ObjectNode configNode =
+                                        
OBJECT_MAPPER_INSTANCE.createObjectNode();
+                                configNode.put(key, value);
+                                ((ObjectNode) n).set("configuration", 
configNode);
+                            }
+                        });
+        return target;
+    }
 }

Reply via email to