This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0dc3220b71a [FLINK-38363][Runtime] WindowedStream.trigger supports
trigger with async state (#27021)
0dc3220b71a is described below
commit 0dc3220b71ab276e070f0c63f131025998fb708d
Author: xia rui <[email protected]>
AuthorDate: Tue Sep 23 23:31:35 2025 +0800
[FLINK-38363][Runtime] WindowedStream.trigger supports trigger with async
state (#27021)
---
.../streaming/api/datastream/WindowedStream.java | 14 ++
.../operators/windowing/WindowTranslationTest.java | 257 +++++++++++++--------
2 files changed, 181 insertions(+), 90 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index a05d3293003..11dc99f7e6c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -102,6 +103,19 @@ public class WindowedStream<T, K, W extends Window> {
return this;
}
+ /**
+ * Sets the {@code AsyncTrigger} that should be used to trigger window
emission.
+ *
+ * <p>Will automatically enable async state for {@code WindowedStream}.
+ */
+ @Experimental
+ public WindowedStream<T, K, W> trigger(AsyncTrigger<? super T, ? super W>
trigger) {
+ enableAsyncState();
+
+ builder.asyncTrigger(trigger);
+ return this;
+ }
+
/**
* Sets the time by which elements are allowed to be late. Elements that
arrive behind the
* watermark by more than the specified time will be dropped. By default,
the allowed lateness
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 28cc3b570af..ef0c9ef8053 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -31,11 +31,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -54,9 +57,12 @@ import
org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import
org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
@@ -1059,9 +1065,10 @@ class WindowTranslationTest {
new Tuple2<>("hello", 1));
}
- @Test
+ @ParameterizedTest(name = "Enable async state = {0}")
+ @ValueSource(booleans = {false, true})
@SuppressWarnings("rawtypes")
- void testReduceWithCustomTrigger() throws Exception {
+ void testReduceWithCustomTrigger(boolean enableAsyncState) throws
Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source =
@@ -1070,134 +1077,199 @@ class WindowTranslationTest {
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 =
- source.keyBy(x -> x.f0)
- .window(
- SlidingEventTimeWindows.of(
- Duration.ofSeconds(1),
Duration.ofMillis(100)))
- .trigger(CountTrigger.of(1))
- .reduce(reducer);
+ enableAsyncState
+ ? source.keyBy(x -> x.f0)
+ .window(
+ SlidingEventTimeWindows.of(
+ Duration.ofSeconds(1),
Duration.ofMillis(100)))
+ .trigger(AsyncCountTrigger.of(1))
+ .reduce(reducer)
+ : source.keyBy(x -> x.f0)
+ .window(
+ SlidingEventTimeWindows.of(
+ Duration.ofSeconds(1),
Duration.ofMillis(100)))
+ .trigger(CountTrigger.of(1))
+ .reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String,
Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>,
Tuple2<String, Integer>>)
window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String,
Integer>> operator =
transform.getOperator();
- assertThat(operator).isInstanceOf(WindowOperator.class);
- WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
- (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>)
operator;
- assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
-
assertThat(winOperator.getWindowAssigner()).isInstanceOf(SlidingEventTimeWindows.class);
-
assertThat(winOperator.getStateDescriptor()).isInstanceOf(ReducingStateDescriptor.class);
+ assertThat(((AbstractStreamOperator<?>)
operator).isAsyncKeyOrderedProcessingEnabled())
+ .isEqualTo(enableAsyncState);
+
+ KeySelector<Tuple2<String, Integer>, String> keySelector;
+ if (enableAsyncState) {
+ assertThat(operator).isInstanceOf(AsyncWindowOperator.class);
+ AsyncWindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>
winOperator =
+ (AsyncWindowOperator<String, Tuple2<String, Integer>, ?,
?, ?>) operator;
+
assertThat(winOperator.getTrigger()).isInstanceOf(AsyncCountTrigger.class);
+
assertThat(winOperator.getWindowAssigner()).isInstanceOf(SlidingEventTimeWindows.class);
+ assertThat(winOperator.getStateDescriptor())
+ .isInstanceOf(
+
org.apache.flink.api.common.state.v2.ReducingStateDescriptor.class);
+
+ keySelector = winOperator.getKeySelector();
+ } else {
+ assertThat(operator).isInstanceOf(WindowOperator.class);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>
winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>)
operator;
+
assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
+
assertThat(winOperator.getWindowAssigner()).isInstanceOf(SlidingEventTimeWindows.class);
+ assertThat(winOperator.getStateDescriptor())
+ .isInstanceOf(ReducingStateDescriptor.class);
+
+ keySelector = winOperator.getKeySelector();
+ }
processElementAndEnsureOutput(
- winOperator,
- winOperator.getKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO,
- new Tuple2<>("hello", 1));
+ operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO, new
Tuple2<>("hello", 1));
}
- @Test
+ @ParameterizedTest(name = "Enable async state = {0}")
+ @ValueSource(booleans = {false, true})
@SuppressWarnings("rawtypes")
- void testApplyWithCustomTrigger() throws Exception {
+ void testApplyWithCustomTrigger(boolean enableAsyncState) throws Exception
{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source =
env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+ WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
String, TimeWindow>
+ windowFunc =
+ new WindowFunction<>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(
+ String key,
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ };
DataStream<Tuple2<String, Integer>> window1 =
- source.keyBy(new TupleKeySelector())
-
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
- .trigger(CountTrigger.of(1))
- .apply(
- new WindowFunction<
- Tuple2<String, Integer>,
- Tuple2<String, Integer>,
- String,
- TimeWindow>() {
- private static final long serialVersionUID
= 1L;
-
- @Override
- public void apply(
- String key,
- TimeWindow window,
- Iterable<Tuple2<String, Integer>>
values,
- Collector<Tuple2<String, Integer>>
out)
- throws Exception {
- for (Tuple2<String, Integer> in :
values) {
- out.collect(in);
- }
- }
- });
+ enableAsyncState
+ ? source.keyBy(new TupleKeySelector())
+
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
+ .trigger(AsyncCountTrigger.of(1))
+ .apply(windowFunc)
+ : source.keyBy(new TupleKeySelector())
+
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
+ .trigger(CountTrigger.of(1))
+ .apply(windowFunc);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String,
Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>,
Tuple2<String, Integer>>)
window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String,
Integer>> operator =
transform.getOperator();
- assertThat(operator).isInstanceOf(WindowOperator.class);
- WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
- (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>)
operator;
- assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
-
assertThat(winOperator.getWindowAssigner()).isInstanceOf(TumblingEventTimeWindows.class);
-
assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class);
+ assertThat(((AbstractStreamOperator<?>)
operator).isAsyncKeyOrderedProcessingEnabled())
+ .isEqualTo(enableAsyncState);
+
+ KeySelector<Tuple2<String, Integer>, String> keySelector;
+ if (enableAsyncState) {
+ assertThat(operator).isInstanceOf(AsyncWindowOperator.class);
+ AsyncWindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>
winOperator =
+ (AsyncWindowOperator<String, Tuple2<String, Integer>, ?,
?, ?>) operator;
+
assertThat(winOperator.getTrigger()).isInstanceOf(AsyncCountTrigger.class);
+ assertThat(winOperator.getWindowAssigner())
+ .isInstanceOf(TumblingEventTimeWindows.class);
+ assertThat(winOperator.getStateDescriptor())
+
.isInstanceOf(org.apache.flink.api.common.state.v2.ListStateDescriptor.class);
+
+ keySelector = winOperator.getKeySelector();
+ } else {
+ assertThat(operator).isInstanceOf(WindowOperator.class);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>
winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>)
operator;
+
assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
+ assertThat(winOperator.getWindowAssigner())
+ .isInstanceOf(TumblingEventTimeWindows.class);
+
assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class);
+
+ keySelector = winOperator.getKeySelector();
+ }
processElementAndEnsureOutput(
- winOperator,
- winOperator.getKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO,
- new Tuple2<>("hello", 1));
+ operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO, new
Tuple2<>("hello", 1));
}
- @Test
+ @ParameterizedTest(name = "Enable async state = {0}")
+ @ValueSource(booleans = {false, true})
@SuppressWarnings("rawtypes")
- void testProcessWithCustomTrigger() throws Exception {
+ void testProcessWithCustomTrigger(boolean enableAsyncState) throws
Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source =
env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+ ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String,
Integer>, String, TimeWindow>
+ windowFunc =
+ new ProcessWindowFunction<>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(
+ String key,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out)
+ throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ };
DataStream<Tuple2<String, Integer>> window1 =
- source.keyBy(new TupleKeySelector())
-
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
- .trigger(CountTrigger.of(1))
- .process(
- new ProcessWindowFunction<
- Tuple2<String, Integer>,
- Tuple2<String, Integer>,
- String,
- TimeWindow>() {
- private static final long serialVersionUID
= 1L;
-
- @Override
- public void process(
- String key,
- Context ctx,
- Iterable<Tuple2<String, Integer>>
values,
- Collector<Tuple2<String, Integer>>
out)
- throws Exception {
- for (Tuple2<String, Integer> in :
values) {
- out.collect(in);
- }
- }
- });
+ enableAsyncState
+ ? source.keyBy(new TupleKeySelector())
+
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
+ .trigger(AsyncCountTrigger.of(1))
+ .process(windowFunc)
+ : source.keyBy(new TupleKeySelector())
+
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
+ .trigger(CountTrigger.of(1))
+ .process(windowFunc);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String,
Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>,
Tuple2<String, Integer>>)
window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String,
Integer>> operator =
transform.getOperator();
- assertThat(operator).isInstanceOf(WindowOperator.class);
- WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
- (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>)
operator;
- assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
-
assertThat(winOperator.getWindowAssigner()).isInstanceOf(TumblingEventTimeWindows.class);
-
assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class);
+ assertThat(((AbstractStreamOperator<?>)
operator).isAsyncKeyOrderedProcessingEnabled())
+ .isEqualTo(enableAsyncState);
+
+ KeySelector<Tuple2<String, Integer>, String> keySelector;
+ if (enableAsyncState) {
+ assertThat(operator).isInstanceOf(AsyncWindowOperator.class);
+ AsyncWindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>
winOperator =
+ (AsyncWindowOperator<String, Tuple2<String, Integer>, ?,
?, ?>) operator;
+
assertThat(winOperator.getTrigger()).isInstanceOf(AsyncCountTrigger.class);
+ assertThat(winOperator.getWindowAssigner())
+ .isInstanceOf(TumblingEventTimeWindows.class);
+ assertThat(winOperator.getStateDescriptor())
+
.isInstanceOf(org.apache.flink.api.common.state.v2.ListStateDescriptor.class);
+
+ keySelector = winOperator.getKeySelector();
+ } else {
+ assertThat(operator).isInstanceOf(WindowOperator.class);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>
winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>)
operator;
+
assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
+ assertThat(winOperator.getWindowAssigner())
+ .isInstanceOf(TumblingEventTimeWindows.class);
+
assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class);
+
+ keySelector = winOperator.getKeySelector();
+ }
processElementAndEnsureOutput(
- winOperator,
- winOperator.getKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO,
- new Tuple2<>("hello", 1));
+ operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO, new
Tuple2<>("hello", 1));
}
@Test
@@ -1486,9 +1558,14 @@ class WindowTranslationTest {
TypeInformation<K> keyType,
IN element)
throws Exception {
-
+ boolean enableAsyncState =
+ ((AbstractStreamOperator<?>)
operator).isAsyncKeyOrderedProcessingEnabled();
KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator,
keySelector, keyType);
+ enableAsyncState
+ ? AsyncKeyedOneInputStreamOperatorTestHarness.create(
+ operator, keySelector, keyType)
+ : new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, keySelector, keyType);
if (operator instanceof OutputTypeConfigurable) {
// use a dummy type since window functions just need the
ExecutionConfig