This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f352dca [FLINK-25074][datastream] Simplify name of WindowOperator f352dca is described below commit f352dca3724deb8a42b36102bc298b58158df90d Author: 龙三 <wenlong....@alibaba-inc.com> AuthorDate: Tue Nov 23 17:14:23 2021 +0800 [FLINK-25074][datastream] Simplify name of WindowOperator This closes #18056. --- .../api/datastream/AllWindowedStream.java | 22 +++++---- .../streaming/api/datastream/WindowedStream.java | 37 +++++++++------ .../operators/windowing/WindowOperatorBuilder.java | 6 ++- .../apache/flink/streaming/api/DataStreamTest.java | 52 ++++++++++++++++++++++ 4 files changed, 94 insertions(+), 23 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index de9299c..9303218 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -259,7 +259,8 @@ public class AllWindowedStream<T, W extends Window> { String callLocation = Utils.getCallLocationName(); String udfName = "AllWindowedStream." + callLocation; - String opName; + String opName = windowAssigner.getClass().getSimpleName(); + String opDescription; KeySelector<T, Byte> keySel = input.getKeySelector(); OneInputStreamOperator<T, R> operator; @@ -276,7 +277,7 @@ public class AllWindowedStream<T, W extends Window> { ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); - opName = + opDescription = "TriggerWindow(" + windowAssigner + ", " @@ -313,7 +314,7 @@ public class AllWindowedStream<T, W extends Window> { input.getType() .createSerializer(getExecutionEnvironment().getConfig())); - opName = + opDescription = "TriggerWindow(" + windowAssigner + ", " @@ -339,7 +340,9 @@ public class AllWindowedStream<T, W extends Window> { lateDataOutputTag); } - return input.transform(opName, resultType, operator).forceNonParallel(); + return input.transform(opName, resultType, operator) + .setDescription(opDescription) + .forceNonParallel(); } /** @@ -808,7 +811,8 @@ public class AllWindowedStream<T, W extends Window> { final String callLocation = Utils.getCallLocationName(); final String udfName = "AllWindowedStream." + callLocation; - final String opName; + final String opName = windowAssigner.getClass().getSimpleName(); + final String opDescription; final KeySelector<T, Byte> keySel = input.getKeySelector(); OneInputStreamOperator<T, R> operator; @@ -825,7 +829,7 @@ public class AllWindowedStream<T, W extends Window> { ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); - opName = + opDescription = "TriggerWindow(" + windowAssigner + ", " @@ -862,7 +866,7 @@ public class AllWindowedStream<T, W extends Window> { accumulatorType.createSerializer( getExecutionEnvironment().getConfig())); - opName = + opDescription = "TriggerWindow(" + windowAssigner + ", " @@ -888,7 +892,9 @@ public class AllWindowedStream<T, W extends Window> { lateDataOutputTag); } - return input.transform(opName, resultType, operator).forceNonParallel(); + return input.transform(opName, resultType, operator) + .setDescription(opDescription) + .forceNonParallel(); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 0e63d66..efec571 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -211,10 +211,11 @@ public class WindowedStream<T, K, W extends Window> { function = input.getExecutionEnvironment().clean(function); reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); - final String opName = builder.generateOperatorName(reduceFunction, function); + final String opName = builder.generateOperatorName(); + final String opDescription = builder.generateOperatorDescription(reduceFunction, function); OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function); - return input.transform(opName, resultType, operator); + return input.transform(opName, resultType, operator).setDescription(opDescription); } /** @@ -258,10 +259,11 @@ public class WindowedStream<T, K, W extends Window> { function = input.getExecutionEnvironment().clean(function); reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); - final String opName = builder.generateOperatorName(reduceFunction, function); + final String opName = builder.generateOperatorName(); + final String opDescription = builder.generateOperatorDescription(reduceFunction, function); OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function); - return input.transform(opName, resultType, operator); + return input.transform(opName, resultType, operator).setDescription(opDescription); } // ------------------------------------------------------------------------ @@ -405,12 +407,14 @@ public class WindowedStream<T, K, W extends Window> { windowFunction = input.getExecutionEnvironment().clean(windowFunction); aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction); - final String opName = builder.generateOperatorName(aggregateFunction, windowFunction); + final String opName = builder.generateOperatorName(); + final String opDescription = + builder.generateOperatorDescription(aggregateFunction, windowFunction); OneInputStreamOperator<T, R> operator = builder.aggregate(aggregateFunction, windowFunction, accumulatorType); - return input.transform(opName, resultType, operator); + return input.transform(opName, resultType, operator).setDescription(opDescription); } /** @@ -514,12 +518,14 @@ public class WindowedStream<T, K, W extends Window> { windowFunction = input.getExecutionEnvironment().clean(windowFunction); aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction); - final String opName = builder.generateOperatorName(aggregateFunction, windowFunction); + final String opName = builder.generateOperatorName(); + final String opDescription = + builder.generateOperatorDescription(aggregateFunction, windowFunction); OneInputStreamOperator<T, R> operator = builder.aggregate(aggregateFunction, windowFunction, accumulatorType); - return input.transform(opName, resultType, operator); + return input.transform(opName, resultType, operator).setDescription(opDescription); } // ------------------------------------------------------------------------ @@ -559,10 +565,11 @@ public class WindowedStream<T, K, W extends Window> { WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { function = input.getExecutionEnvironment().clean(function); - final String opName = builder.generateOperatorName(function, null); + final String opName = builder.generateOperatorName(); + final String opDescription = builder.generateOperatorDescription(function, null); OneInputStreamOperator<T, R> operator = builder.apply(function); - return input.transform(opName, resultType, operator); + return input.transform(opName, resultType, operator).setDescription(opDescription); } /** @@ -601,11 +608,12 @@ public class WindowedStream<T, K, W extends Window> { ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { function = input.getExecutionEnvironment().clean(function); - final String opName = builder.generateOperatorName(function, null); + final String opName = builder.generateOperatorName(); + final String opDesc = builder.generateOperatorDescription(function, null); OneInputStreamOperator<T, R> operator = builder.process(function); - return input.transform(opName, resultType, operator); + return input.transform(opName, resultType, operator).setDescription(opDesc); } /** @@ -651,11 +659,12 @@ public class WindowedStream<T, K, W extends Window> { function = input.getExecutionEnvironment().clean(function); reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); - final String opName = builder.generateOperatorName(reduceFunction, function); + final String opName = builder.generateOperatorName(); + final String opDesc = builder.generateOperatorDescription(reduceFunction, function); OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function); - return input.transform(opName, resultType, operator); + return input.transform(opName, resultType, operator).setDescription(opDesc); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java index 9fb1506..f9db7a3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java @@ -326,7 +326,11 @@ public class WindowOperatorBuilder<T, K, W extends Window> { } } - public String generateOperatorName(Function function1, @Nullable Function function2) { + public String generateOperatorName() { + return windowAssigner.getClass().getSimpleName(); + } + + public String generateOperatorDescription(Function function1, @Nullable Function function2) { return "Window(" + windowAssigner + ", " diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 144d836..ea61b18 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -69,6 +69,8 @@ import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; @@ -1104,6 +1106,56 @@ public class DataStreamTest extends TestLogger { }); } + /** Tests that verifies window operator has different name and description. */ + @Test + public void testWindowOperatorDescription() { + // global window + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Long> dataStream1 = + env.generateSequence(0, 0) + .windowAll(GlobalWindows.create()) + .trigger(PurgingTrigger.of(CountTrigger.of(10))) + .reduce( + new ReduceFunction<Long>() { + private static final long serialVersionUID = 1L; + + @Override + public Long reduce(Long value1, Long value2) throws Exception { + return null; + } + }); + // name is simplified + assertEquals("GlobalWindows", dataStream1.getTransformation().getName()); + // description contains detail of function: + // TriggerWindow(GlobalWindows(), ReducingStateDescriptor{name=window-contents, + // defaultValue=null, + // serializer=org.apache.flink.api.common.typeutils.base.LongSerializer@6af9fcb2}, + // PurgingTrigger(CountTrigger(10)), AllWindowedStream.reduce(AllWindowedStream.java:229)) + assertTrue(dataStream1.getTransformation().getDescription().contains("PurgingTrigger")); + + // keyed window + DataStream<Long> dataStream2 = + env.generateSequence(0, 0) + .keyBy(value -> value) + .window(TumblingEventTimeWindows.of(Time.milliseconds(1000))) + .trigger(PurgingTrigger.of(CountTrigger.of(10))) + .reduce( + new ReduceFunction<Long>() { + private static final long serialVersionUID = 1L; + + @Override + public Long reduce(Long value1, Long value2) throws Exception { + return null; + } + }); + // name is simplified + assertEquals("TumblingEventTimeWindows", dataStream2.getTransformation().getName()); + // description contains detail of function: + // Window(TumblingEventTimeWindows(1000), PurgingTrigger, ReduceFunction$36, + // PassThroughWindowFunction) + assertTrue(dataStream2.getTransformation().getDescription().contains("PurgingTrigger")); + } + /** * Tests {@link SingleOutputStreamOperator#setDescription(String)} functionality. *