This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cefcac6996b9f59dc7ea6ae6e9a7bb35ed4504cb Author: Fabian Paul <fabianp...@ververica.com> AuthorDate: Tue Mar 15 13:13:02 2022 +0100 [FLINK-26613][streaming] Use Flink 1.13 sink committer operator uid pattern Since there is no dedicated committer operator in Flink 1.14 it is safe to use the uid pattern of 1.13 to ease upgrades from Flink 1.13 to 1.15. --- .../translators/SinkTransformationTranslator.java | 7 ++++--- .../graph/SinkTransformationTranslatorTest.java | 23 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index 782ccca..0dd6087 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -355,9 +355,10 @@ public class SinkTransformationTranslator<Input, Output> BiConsumer<Transformation<?>, String> setter, @Nullable String transformationName) { if (transformationName != null && getter.apply(transformation) != null) { - // Use the same uid pattern than for Sink V1 + // Use the same uid pattern than for Sink V1. We deliberately decided to use the uid + // pattern of Flink 1.13 because 1.14 did not have a dedicated committer operator. if (transformationName.equals(COMMITTER_NAME)) { - final String committerFormat = "Sink %s Committer"; + final String committerFormat = "Sink Committer: %s"; setter.accept( subTransformation, String.format(committerFormat, getter.apply(transformation))); @@ -369,7 +370,7 @@ public class SinkTransformationTranslator<Input, Output> return; } - // Use the same uid pattern than for Sink V1 + // Use the same uid pattern than for Sink V1 in Flink 1.14. if (transformationName.equals( StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME)) { final String committerFormat = "Sink %s Global Committer"; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java index 164a8c4..e2086ec 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java @@ -293,6 +293,29 @@ public class SinkTransformationTranslatorTest extends TestLogger { assertEquals(findGlobalCommitter(streamGraph).getUserHash(), globalCommitterHash); } + /** + * When ever you need to change something in this test case please think about possible state + * upgrade problems introduced by your changes. + */ + @Test + public void testSettingOperatorUids() { + final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead"; + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final DataStreamSource<Integer> src = env.fromElements(1, 2); + src.sinkTo(TestSink.newBuilder().setDefaultCommitter().setDefaultGlobalCommitter().build()) + .name(NAME) + .uid(sinkUid); + + final StreamGraph streamGraph = env.getStreamGraph(); + assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid); + assertEquals( + findCommitter(streamGraph).getTransformationUID(), + String.format("Sink Committer: %s", sinkUid)); + assertEquals( + findGlobalCommitter(streamGraph).getTransformationUID(), + String.format("Sink %s Global Committer", sinkUid)); + } + private void validateTopology( StreamNode src, Class<?> srcOutTypeInfo,