This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dc3220b71a [FLINK-38363][Runtime] WindowedStream.trigger supports 
trigger with async state (#27021)
0dc3220b71a is described below

commit 0dc3220b71ab276e070f0c63f131025998fb708d
Author: xia rui <[email protected]>
AuthorDate: Tue Sep 23 23:31:35 2025 +0800

    [FLINK-38363][Runtime] WindowedStream.trigger supports trigger with async 
state (#27021)
---
 .../streaming/api/datastream/WindowedStream.java   |  14 ++
 .../operators/windowing/WindowTranslationTest.java | 257 +++++++++++++--------
 2 files changed, 181 insertions(+), 90 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index a05d3293003..11dc99f7e6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -102,6 +103,19 @@ public class WindowedStream<T, K, W extends Window> {
         return this;
     }
 
+    /**
+     * Sets the {@code AsyncTrigger} that should be used to trigger window 
emission.
+     *
+     * <p>Will automatically enable async state for {@code WindowedStream}.
+     */
+    @Experimental
+    public WindowedStream<T, K, W> trigger(AsyncTrigger<? super T, ? super W> 
trigger) {
+        enableAsyncState();
+
+        builder.asyncTrigger(trigger);
+        return this;
+    }
+
     /**
      * Sets the time by which elements are allowed to be late. Elements that 
arrive behind the
      * watermark by more than the specified time will be dropped. By default, 
the allowed lateness
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 28cc3b570af..ef0c9ef8053 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -31,11 +31,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -54,9 +57,12 @@ import 
org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import 
org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 
@@ -1059,9 +1065,10 @@ class WindowTranslationTest {
                 new Tuple2<>("hello", 1));
     }
 
-    @Test
+    @ParameterizedTest(name = "Enable async state = {0}")
+    @ValueSource(booleans = {false, true})
     @SuppressWarnings("rawtypes")
-    void testReduceWithCustomTrigger() throws Exception {
+    void testReduceWithCustomTrigger(boolean enableAsyncState) throws 
Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         DataStream<Tuple2<String, Integer>> source =
@@ -1070,134 +1077,199 @@ class WindowTranslationTest {
         DummyReducer reducer = new DummyReducer();
 
         DataStream<Tuple2<String, Integer>> window1 =
-                source.keyBy(x -> x.f0)
-                        .window(
-                                SlidingEventTimeWindows.of(
-                                        Duration.ofSeconds(1), 
Duration.ofMillis(100)))
-                        .trigger(CountTrigger.of(1))
-                        .reduce(reducer);
+                enableAsyncState
+                        ? source.keyBy(x -> x.f0)
+                                .window(
+                                        SlidingEventTimeWindows.of(
+                                                Duration.ofSeconds(1), 
Duration.ofMillis(100)))
+                                .trigger(AsyncCountTrigger.of(1))
+                                .reduce(reducer)
+                        : source.keyBy(x -> x.f0)
+                                .window(
+                                        SlidingEventTimeWindows.of(
+                                                Duration.ofSeconds(1), 
Duration.ofMillis(100)))
+                                .trigger(CountTrigger.of(1))
+                                .reduce(reducer);
 
         OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
                 (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>)
                         window1.getTransformation();
         OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator =
                 transform.getOperator();
-        assertThat(operator).isInstanceOf(WindowOperator.class);
-        WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-                (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
-        assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
-        
assertThat(winOperator.getWindowAssigner()).isInstanceOf(SlidingEventTimeWindows.class);
-        
assertThat(winOperator.getStateDescriptor()).isInstanceOf(ReducingStateDescriptor.class);
+        assertThat(((AbstractStreamOperator<?>) 
operator).isAsyncKeyOrderedProcessingEnabled())
+                .isEqualTo(enableAsyncState);
+
+        KeySelector<Tuple2<String, Integer>, String> keySelector;
+        if (enableAsyncState) {
+            assertThat(operator).isInstanceOf(AsyncWindowOperator.class);
+            AsyncWindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                    (AsyncWindowOperator<String, Tuple2<String, Integer>, ?, 
?, ?>) operator;
+            
assertThat(winOperator.getTrigger()).isInstanceOf(AsyncCountTrigger.class);
+            
assertThat(winOperator.getWindowAssigner()).isInstanceOf(SlidingEventTimeWindows.class);
+            assertThat(winOperator.getStateDescriptor())
+                    .isInstanceOf(
+                            
org.apache.flink.api.common.state.v2.ReducingStateDescriptor.class);
+
+            keySelector = winOperator.getKeySelector();
+        } else {
+            assertThat(operator).isInstanceOf(WindowOperator.class);
+            WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                    (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+            
assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
+            
assertThat(winOperator.getWindowAssigner()).isInstanceOf(SlidingEventTimeWindows.class);
+            assertThat(winOperator.getStateDescriptor())
+                    .isInstanceOf(ReducingStateDescriptor.class);
+
+            keySelector = winOperator.getKeySelector();
+        }
 
         processElementAndEnsureOutput(
-                winOperator,
-                winOperator.getKeySelector(),
-                BasicTypeInfo.STRING_TYPE_INFO,
-                new Tuple2<>("hello", 1));
+                operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
     }
 
-    @Test
+    @ParameterizedTest(name = "Enable async state = {0}")
+    @ValueSource(booleans = {false, true})
     @SuppressWarnings("rawtypes")
-    void testApplyWithCustomTrigger() throws Exception {
+    void testApplyWithCustomTrigger(boolean enableAsyncState) throws Exception 
{
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         DataStream<Tuple2<String, Integer>> source =
                 env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+        WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, 
String, TimeWindow>
+                windowFunc =
+                        new WindowFunction<>() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public void apply(
+                                    String key,
+                                    TimeWindow window,
+                                    Iterable<Tuple2<String, Integer>> values,
+                                    Collector<Tuple2<String, Integer>> out) {
+                                for (Tuple2<String, Integer> in : values) {
+                                    out.collect(in);
+                                }
+                            }
+                        };
 
         DataStream<Tuple2<String, Integer>> window1 =
-                source.keyBy(new TupleKeySelector())
-                        
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
-                        .trigger(CountTrigger.of(1))
-                        .apply(
-                                new WindowFunction<
-                                        Tuple2<String, Integer>,
-                                        Tuple2<String, Integer>,
-                                        String,
-                                        TimeWindow>() {
-                                    private static final long serialVersionUID 
= 1L;
-
-                                    @Override
-                                    public void apply(
-                                            String key,
-                                            TimeWindow window,
-                                            Iterable<Tuple2<String, Integer>> 
values,
-                                            Collector<Tuple2<String, Integer>> 
out)
-                                            throws Exception {
-                                        for (Tuple2<String, Integer> in : 
values) {
-                                            out.collect(in);
-                                        }
-                                    }
-                                });
+                enableAsyncState
+                        ? source.keyBy(new TupleKeySelector())
+                                
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
+                                .trigger(AsyncCountTrigger.of(1))
+                                .apply(windowFunc)
+                        : source.keyBy(new TupleKeySelector())
+                                
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
+                                .trigger(CountTrigger.of(1))
+                                .apply(windowFunc);
 
         OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
                 (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>)
                         window1.getTransformation();
         OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator =
                 transform.getOperator();
-        assertThat(operator).isInstanceOf(WindowOperator.class);
-        WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-                (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
-        assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
-        
assertThat(winOperator.getWindowAssigner()).isInstanceOf(TumblingEventTimeWindows.class);
-        
assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class);
+        assertThat(((AbstractStreamOperator<?>) 
operator).isAsyncKeyOrderedProcessingEnabled())
+                .isEqualTo(enableAsyncState);
+
+        KeySelector<Tuple2<String, Integer>, String> keySelector;
+        if (enableAsyncState) {
+            assertThat(operator).isInstanceOf(AsyncWindowOperator.class);
+            AsyncWindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                    (AsyncWindowOperator<String, Tuple2<String, Integer>, ?, 
?, ?>) operator;
+            
assertThat(winOperator.getTrigger()).isInstanceOf(AsyncCountTrigger.class);
+            assertThat(winOperator.getWindowAssigner())
+                    .isInstanceOf(TumblingEventTimeWindows.class);
+            assertThat(winOperator.getStateDescriptor())
+                    
.isInstanceOf(org.apache.flink.api.common.state.v2.ListStateDescriptor.class);
+
+            keySelector = winOperator.getKeySelector();
+        } else {
+            assertThat(operator).isInstanceOf(WindowOperator.class);
+            WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                    (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+            
assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
+            assertThat(winOperator.getWindowAssigner())
+                    .isInstanceOf(TumblingEventTimeWindows.class);
+            
assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class);
+
+            keySelector = winOperator.getKeySelector();
+        }
 
         processElementAndEnsureOutput(
-                winOperator,
-                winOperator.getKeySelector(),
-                BasicTypeInfo.STRING_TYPE_INFO,
-                new Tuple2<>("hello", 1));
+                operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
     }
 
-    @Test
+    @ParameterizedTest(name = "Enable async state = {0}")
+    @ValueSource(booleans = {false, true})
     @SuppressWarnings("rawtypes")
-    void testProcessWithCustomTrigger() throws Exception {
+    void testProcessWithCustomTrigger(boolean enableAsyncState) throws 
Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         DataStream<Tuple2<String, Integer>> source =
                 env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+        ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, 
Integer>, String, TimeWindow>
+                windowFunc =
+                        new ProcessWindowFunction<>() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public void process(
+                                    String key,
+                                    Context ctx,
+                                    Iterable<Tuple2<String, Integer>> values,
+                                    Collector<Tuple2<String, Integer>> out)
+                                    throws Exception {
+                                for (Tuple2<String, Integer> in : values) {
+                                    out.collect(in);
+                                }
+                            }
+                        };
 
         DataStream<Tuple2<String, Integer>> window1 =
-                source.keyBy(new TupleKeySelector())
-                        
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
-                        .trigger(CountTrigger.of(1))
-                        .process(
-                                new ProcessWindowFunction<
-                                        Tuple2<String, Integer>,
-                                        Tuple2<String, Integer>,
-                                        String,
-                                        TimeWindow>() {
-                                    private static final long serialVersionUID 
= 1L;
-
-                                    @Override
-                                    public void process(
-                                            String key,
-                                            Context ctx,
-                                            Iterable<Tuple2<String, Integer>> 
values,
-                                            Collector<Tuple2<String, Integer>> 
out)
-                                            throws Exception {
-                                        for (Tuple2<String, Integer> in : 
values) {
-                                            out.collect(in);
-                                        }
-                                    }
-                                });
+                enableAsyncState
+                        ? source.keyBy(new TupleKeySelector())
+                                
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
+                                .trigger(AsyncCountTrigger.of(1))
+                                .process(windowFunc)
+                        : source.keyBy(new TupleKeySelector())
+                                
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
+                                .trigger(CountTrigger.of(1))
+                                .process(windowFunc);
 
         OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
                 (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>)
                         window1.getTransformation();
         OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator =
                 transform.getOperator();
-        assertThat(operator).isInstanceOf(WindowOperator.class);
-        WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-                (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
-        assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
-        
assertThat(winOperator.getWindowAssigner()).isInstanceOf(TumblingEventTimeWindows.class);
-        
assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class);
+        assertThat(((AbstractStreamOperator<?>) 
operator).isAsyncKeyOrderedProcessingEnabled())
+                .isEqualTo(enableAsyncState);
+
+        KeySelector<Tuple2<String, Integer>, String> keySelector;
+        if (enableAsyncState) {
+            assertThat(operator).isInstanceOf(AsyncWindowOperator.class);
+            AsyncWindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                    (AsyncWindowOperator<String, Tuple2<String, Integer>, ?, 
?, ?>) operator;
+            
assertThat(winOperator.getTrigger()).isInstanceOf(AsyncCountTrigger.class);
+            assertThat(winOperator.getWindowAssigner())
+                    .isInstanceOf(TumblingEventTimeWindows.class);
+            assertThat(winOperator.getStateDescriptor())
+                    
.isInstanceOf(org.apache.flink.api.common.state.v2.ListStateDescriptor.class);
+
+            keySelector = winOperator.getKeySelector();
+        } else {
+            assertThat(operator).isInstanceOf(WindowOperator.class);
+            WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
+                    (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+            
assertThat(winOperator.getTrigger()).isInstanceOf(CountTrigger.class);
+            assertThat(winOperator.getWindowAssigner())
+                    .isInstanceOf(TumblingEventTimeWindows.class);
+            
assertThat(winOperator.getStateDescriptor()).isInstanceOf(ListStateDescriptor.class);
+
+            keySelector = winOperator.getKeySelector();
+        }
 
         processElementAndEnsureOutput(
-                winOperator,
-                winOperator.getKeySelector(),
-                BasicTypeInfo.STRING_TYPE_INFO,
-                new Tuple2<>("hello", 1));
+                operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
     }
 
     @Test
@@ -1486,9 +1558,14 @@ class WindowTranslationTest {
             TypeInformation<K> keyType,
             IN element)
             throws Exception {
-
+        boolean enableAsyncState =
+                ((AbstractStreamOperator<?>) 
operator).isAsyncKeyOrderedProcessingEnabled();
         KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness =
-                new KeyedOneInputStreamOperatorTestHarness<>(operator, 
keySelector, keyType);
+                enableAsyncState
+                        ? AsyncKeyedOneInputStreamOperatorTestHarness.create(
+                                operator, keySelector, keyType)
+                        : new KeyedOneInputStreamOperatorTestHarness<>(
+                                operator, keySelector, keyType);
 
         if (operator instanceof OutputTypeConfigurable) {
             // use a dummy type since window functions just need the 
ExecutionConfig

Reply via email to