This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 831cb8eff022db5543052c96716518c86aaaa2c2 Author: Wencong Liu <liuwencle...@163.com> AuthorDate: Tue Aug 29 12:37:07 2023 +0800 [FLINK-32979] Migrate the usage of getDefaultTrigger(StreamExecutionEnvironment env) to getDefaultTrigger() This closes #23073 --- .../org/apache/flink/state/api/WindowedOperatorTransformation.java | 2 +- .../org/apache/flink/state/api/WindowedStateTransformation.java | 2 +- .../apache/flink/streaming/api/datastream/AllWindowedStream.java | 2 +- .../org/apache/flink/streaming/api/datastream/WindowedStream.java | 2 +- .../api/windowing/assigners/DynamicEventTimeSessionWindows.java | 7 +++++++ .../windowing/assigners/DynamicProcessingTimeSessionWindows.java | 7 +++++++ .../streaming/api/windowing/assigners/EventTimeSessionWindows.java | 6 ++++++ .../flink/streaming/api/windowing/assigners/GlobalWindows.java | 6 ++++++ .../api/windowing/assigners/ProcessingTimeSessionWindows.java | 6 ++++++ .../streaming/api/windowing/assigners/SlidingEventTimeWindows.java | 6 ++++++ .../api/windowing/assigners/SlidingProcessingTimeWindows.java | 6 ++++++ .../api/windowing/assigners/TumblingEventTimeWindows.java | 6 ++++++ .../api/windowing/assigners/TumblingProcessingTimeWindows.java | 6 ++++++ .../operators/windowing/DynamicEventTimeSessionWindowsTest.java | 5 +---- .../windowing/DynamicProcessingTimeSessionWindowsTest.java | 5 +---- .../runtime/operators/windowing/EventTimeSessionWindowsTest.java | 5 +---- .../streaming/runtime/operators/windowing/GlobalWindowsTest.java | 5 +---- .../runtime/operators/windowing/MergingWindowSetTest.java | 6 ++++++ .../operators/windowing/ProcessingTimeSessionWindowsTest.java | 5 +---- .../runtime/operators/windowing/SlidingEventTimeWindowsTest.java | 5 +---- .../operators/windowing/SlidingProcessingTimeWindowsTest.java | 5 +---- .../runtime/operators/windowing/TumblingEventTimeWindowsTest.java | 5 +---- .../operators/windowing/TumblingProcessingTimeWindowsTest.java | 5 +---- 23 files changed, 75 insertions(+), 40 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java index f0fdc8cabd5..ed72d7eaf19 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java @@ -84,7 +84,7 @@ public class WindowedOperatorTransformation<T, K, W extends Window> { this.builder = new WindowOperatorBuilder<>( windowAssigner, - windowAssigner.getDefaultTrigger(null), + windowAssigner.getDefaultTrigger(), input.getExecutionEnvironment().getConfig(), input.getType(), keySelector, diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java index fc0f85812e8..5d42ba7811c 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java @@ -77,7 +77,7 @@ public class WindowedStateTransformation<T, K, W extends Window> { this.builder = new WindowOperatorBuilder<>( windowAssigner, - windowAssigner.getDefaultTrigger(null), + windowAssigner.getDefaultTrigger(), input.getExecutionEnvironment().getConfig(), input.getType(), keySelector, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 33df648bab0..639d2af5f64 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -111,7 +111,7 @@ public class AllWindowedStream<T, W extends Window> { public AllWindowedStream(DataStream<T> input, WindowAssigner<? super T, W> windowAssigner) { this.input = input.keyBy(new NullByteKeySelector<T>()); this.windowAssigner = windowAssigner; - this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()); + this.trigger = windowAssigner.getDefaultTrigger(); } /** Sets the {@code Trigger} that should be used to trigger window emission. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 283b14a53d8..7e0b9aa48f1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -83,7 +83,7 @@ public class WindowedStream<T, K, W extends Window> { this.builder = new WindowOperatorBuilder<>( windowAssigner, - windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()), + windowAssigner.getDefaultTrigger(), input.getExecutionConfig(), input.getType(), input.getKeySelector(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java index 257fe471a68..c7c70681243 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java @@ -68,6 +68,13 @@ public class DynamicEventTimeSessionWindows<T> extends MergingWindowAssigner<T, @SuppressWarnings("unchecked") @Override public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + throw new UnsupportedOperationException( + "This method is deprecated and shouldn't be invoked. Please use getDefaultTrigger() instead."); + } + + @SuppressWarnings("unchecked") + @Override + public Trigger<T, TimeWindow> getDefaultTrigger() { return (Trigger<T, TimeWindow>) EventTimeTrigger.create(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java index bbd6fa800cb..58e7aac6598 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java @@ -70,6 +70,13 @@ public class DynamicProcessingTimeSessionWindows<T> extends MergingWindowAssigne @SuppressWarnings("unchecked") @Override public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + throw new UnsupportedOperationException( + "This method is deprecated and shouldn't be invoked. Please use getDefaultTrigger() instead."); + } + + @SuppressWarnings("unchecked") + @Override + public Trigger<T, TimeWindow> getDefaultTrigger() { return (Trigger<T, TimeWindow>) ProcessingTimeTrigger.create(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java index 37b1b52c4d2..ee3a0e0c263 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java @@ -65,6 +65,12 @@ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeW @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + throw new UnsupportedOperationException( + "This method is deprecated and shouldn't be invoked. Please use getDefaultTrigger() instead."); + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { return EventTimeTrigger.create(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java index 5c388665604..b7f0f5c074d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -50,6 +50,12 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { @Override public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + throw new UnsupportedOperationException( + "This method is deprecated and shouldn't be invoked. Please use getDefaultTrigger() instead."); + } + + @Override + public Trigger<Object, GlobalWindow> getDefaultTrigger() { return new NeverTrigger(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java index 38b68b5d710..c82309e7259 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java @@ -67,6 +67,12 @@ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + throw new UnsupportedOperationException( + "This method is deprecated and shouldn't be invoked. Please use getDefaultTrigger() instead."); + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { return ProcessingTimeTrigger.create(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java index 8c71178495c..3711d67b0d2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java @@ -94,6 +94,12 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + throw new UnsupportedOperationException( + "This method is deprecated and shouldn't be invoked. Please use getDefaultTrigger() instead."); + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { return EventTimeTrigger.create(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java index e23bfc4fac7..265a09286bf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java @@ -86,6 +86,12 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + throw new UnsupportedOperationException( + "This method is deprecated and shouldn't be invoked. Please use getDefaultTrigger() instead."); + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { return ProcessingTimeTrigger.create(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java index b3635ac133c..787514ca523 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java @@ -89,6 +89,12 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + throw new UnsupportedOperationException( + "This method is deprecated and shouldn't be invoked. Please use getDefaultTrigger() instead."); + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { return EventTimeTrigger.create(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java index 60f36081da1..e22b63acf57 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java @@ -85,6 +85,12 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + throw new UnsupportedOperationException( + "This method is deprecated and shouldn't be invoked. Please use getDefaultTrigger() instead."); + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { return ProcessingTimeTrigger.create(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicEventTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicEventTimeSessionWindowsTest.java index 3bb0c2b7aad..a5a312e9ef7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicEventTimeSessionWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicEventTimeSessionWindowsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor; @@ -229,8 +228,6 @@ public class DynamicEventTimeSessionWindowsTest extends TestLogger { assertTrue(assigner.isEventTime()); assertEquals( new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); - assertThat( - assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), - instanceOf(EventTimeTrigger.class)); + assertThat(assigner.getDefaultTrigger(), instanceOf(EventTimeTrigger.class)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicProcessingTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicProcessingTimeSessionWindowsTest.java index 240bdae064f..705d4f542cf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicProcessingTimeSessionWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicProcessingTimeSessionWindowsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.DynamicProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor; @@ -233,8 +232,6 @@ public class DynamicProcessingTimeSessionWindowsTest extends TestLogger { assertFalse(assigner.isEventTime()); assertEquals( new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); - assertThat( - assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), - instanceOf(ProcessingTimeTrigger.class)); + assertThat(assigner.getDefaultTrigger(), instanceOf(ProcessingTimeTrigger.class)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java index b4f4f17d032..31e908744e2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; @@ -221,9 +220,7 @@ public class EventTimeSessionWindowsTest extends TestLogger { assertTrue(assigner.isEventTime()); assertEquals( new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); - assertThat( - assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), - instanceOf(EventTimeTrigger.class)); + assertThat(assigner.getDefaultTrigger(), instanceOf(EventTimeTrigger.class)); } @Test diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java index 771c52a27dd..d543eb84980 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; @@ -58,8 +57,6 @@ public class GlobalWindowsTest extends TestLogger { assertFalse(assigner.isEventTime()); assertEquals( new GlobalWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); - assertThat( - assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), - instanceOf(GlobalWindows.NeverTrigger.class)); + assertThat(assigner.getDefaultTrigger(), instanceOf(GlobalWindows.NeverTrigger.class)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java index c8234ab05e8..542eef60eb9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java @@ -570,6 +570,12 @@ public class MergingWindowSetTest { @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + throw new UnsupportedOperationException( + "This method is deprecated and shouldn't be invoked. Please use getDefaultTrigger() instead."); + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { return EventTimeTrigger.create(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java index db4d675e977..4702be4384f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.DynamicProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; @@ -233,9 +232,7 @@ public class ProcessingTimeSessionWindowsTest extends TestLogger { assertFalse(assigner.isEventTime()); assertEquals( new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); - assertThat( - assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), - instanceOf(ProcessingTimeTrigger.class)); + assertThat(assigner.getDefaultTrigger(), instanceOf(ProcessingTimeTrigger.class)); } @Test diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java index a688ac14e8f..9e2df1694ec 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.time.Time; @@ -237,8 +236,6 @@ public class SlidingEventTimeWindowsTest extends TestLogger { assertTrue(assigner.isEventTime()); assertEquals( new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); - assertThat( - assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), - instanceOf(EventTimeTrigger.class)); + assertThat(assigner.getDefaultTrigger(), instanceOf(EventTimeTrigger.class)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java index 299baaac1eb..5fa21c4fc49 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.time.Time; @@ -250,8 +249,6 @@ public class SlidingProcessingTimeWindowsTest extends TestLogger { assertFalse(assigner.isEventTime()); assertEquals( new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); - assertThat( - assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), - instanceOf(ProcessingTimeTrigger.class)); + assertThat(assigner.getDefaultTrigger(), instanceOf(ProcessingTimeTrigger.class)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java index d0fe400a64c..166a10827bd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowStagger; @@ -173,8 +172,6 @@ public class TumblingEventTimeWindowsTest extends TestLogger { assertTrue(assigner.isEventTime()); assertEquals( new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); - assertThat( - assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), - instanceOf(EventTimeTrigger.class)); + assertThat(assigner.getDefaultTrigger(), instanceOf(EventTimeTrigger.class)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java index 70b8bd01a47..981f7cd70bb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowStagger; @@ -199,8 +198,6 @@ public class TumblingProcessingTimeWindowsTest extends TestLogger { assertFalse(assigner.isEventTime()); assertEquals( new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); - assertThat( - assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), - instanceOf(ProcessingTimeTrigger.class)); + assertThat(assigner.getDefaultTrigger(), instanceOf(ProcessingTimeTrigger.class)); } }