This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cefcac6996b9f59dc7ea6ae6e9a7bb35ed4504cb
Author: Fabian Paul <fabianp...@ververica.com>
AuthorDate: Tue Mar 15 13:13:02 2022 +0100

    [FLINK-26613][streaming] Use Flink 1.13 sink committer operator uid pattern
    
    Since there is no dedicated committer operator in Flink 1.14 it is safe
    to use the uid pattern of 1.13 to ease upgrades from Flink 1.13 to 1.15.
---
 .../translators/SinkTransformationTranslator.java  |  7 ++++---
 .../graph/SinkTransformationTranslatorTest.java    | 23 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index 782ccca..0dd6087 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -355,9 +355,10 @@ public class SinkTransformationTranslator<Input, Output>
                 BiConsumer<Transformation<?>, String> setter,
                 @Nullable String transformationName) {
             if (transformationName != null && getter.apply(transformation) != 
null) {
-                // Use the same uid pattern than for Sink V1
+                // Use the same uid pattern than for Sink V1. We deliberately 
decided to use the uid
+                // pattern of Flink 1.13 because 1.14 did not have a dedicated 
committer operator.
                 if (transformationName.equals(COMMITTER_NAME)) {
-                    final String committerFormat = "Sink %s Committer";
+                    final String committerFormat = "Sink Committer: %s";
                     setter.accept(
                             subTransformation,
                             String.format(committerFormat, 
getter.apply(transformation)));
@@ -369,7 +370,7 @@ public class SinkTransformationTranslator<Input, Output>
                     return;
                 }
 
-                // Use the same uid pattern than for Sink V1
+                // Use the same uid pattern than for Sink V1 in Flink 1.14.
                 if (transformationName.equals(
                         
StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME)) {
                     final String committerFormat = "Sink %s Global Committer";
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
index 164a8c4..e2086ec 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
@@ -293,6 +293,29 @@ public class SinkTransformationTranslatorTest extends 
TestLogger {
         assertEquals(findGlobalCommitter(streamGraph).getUserHash(), 
globalCommitterHash);
     }
 
+    /**
+     * When ever you need to change something in this test case please think 
about possible state
+     * upgrade problems introduced by your changes.
+     */
+    @Test
+    public void testSettingOperatorUids() {
+        final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStreamSource<Integer> src = env.fromElements(1, 2);
+        
src.sinkTo(TestSink.newBuilder().setDefaultCommitter().setDefaultGlobalCommitter().build())
+                .name(NAME)
+                .uid(sinkUid);
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid);
+        assertEquals(
+                findCommitter(streamGraph).getTransformationUID(),
+                String.format("Sink Committer: %s", sinkUid));
+        assertEquals(
+                findGlobalCommitter(streamGraph).getTransformationUID(),
+                String.format("Sink %s Global Committer", sinkUid));
+    }
+
     private void validateTopology(
             StreamNode src,
             Class<?> srcOutTypeInfo,

Reply via email to