This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new 8c8667e0166 [FLINK-37623][datastream] Async state support for
process() in Datastream API (#26439)
8c8667e0166 is described below
commit 8c8667e0166a9385bb8a48b09e159848b52d52da
Author: Yanfei Lei <[email protected]>
AuthorDate: Tue Apr 15 10:39:15 2025 +0800
[FLINK-37623][datastream] Async state support for process() in Datastream
API (#26439)
---
.../streaming/api/datastream/KeyedStream.java | 12 +++---
.../apache/flink/streaming/api/DataStreamTest.java | 44 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index dd4818fe30f..92b01f65c0c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import
org.apache.flink.runtime.asyncprocessing.operators.AsyncIntervalJoinOperator;
+import
org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncStreamFlatMap;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -358,9 +359,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
@Internal
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType) {
-
- KeyedProcessOperator<KEY, T, R> operator =
- new KeyedProcessOperator<>(clean(keyedProcessFunction));
+ OneInputStreamOperator<T, R> operator =
+ isEnableAsyncState()
+ ? new
AsyncKeyedProcessOperator<>(clean(keyedProcessFunction))
+ : new
KeyedProcessOperator<>(clean(keyedProcessFunction));
return transform("KeyedProcess", outputType, operator);
}
@@ -370,9 +372,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
@Override
public <R> SingleOutputStreamOperator<R> flatMap(
FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
- OneInputStreamOperator operator =
+ OneInputStreamOperator<T, R> operator =
isEnableAsyncState()
- ? new AsyncStreamFlatMap(clean(flatMapper))
+ ? new AsyncStreamFlatMap<>(clean(flatMapper))
: new StreamFlatMap<>(clean(flatMapper));
return transform("Flat Map", outputType, operator);
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index c02464b9557..747cec98067 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -40,6 +40,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
+import
org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
@@ -934,6 +936,48 @@ class DataStreamTest {
assertThat(getOperatorForDataStream(processed)).isInstanceOf(KeyedProcessOperator.class);
}
+ /**
+ * Verify that a {@link KeyedStream#process(KeyedProcessFunction)} call is
correctly translated
+ * to an async operator.
+ */
+ @Test
+ void testAsyncKeyedStreamKeyedProcessTranslation() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Long> src = env.fromSequence(0, 0);
+
+ KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction =
+ new KeyedProcessFunction<Long, Long, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(Long value, Context ctx,
Collector<Integer> out)
+ throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Integer> out)
+ throws Exception {
+ // Do nothing
+ }
+ };
+
+ DataStream<Integer> processed =
+ src.keyBy(new IdentityKeySelector<Long>())
+ .enableAsyncState()
+ .process(keyedProcessFunction);
+
+ processed.sinkTo(new DiscardingSink<Integer>());
+
+ assertThat(
+ ((AbstractAsyncStateUdfStreamOperator<?, ?>)
+ getOperatorForDataStream(processed))
+ .getUserFunction())
+ .isEqualTo(keyedProcessFunction);
+ assertThat(getOperatorForDataStream(processed))
+ .isInstanceOf(AsyncKeyedProcessOperator.class);
+ }
+
/**
* Verify that a {@link DataStream#process(ProcessFunction)} call is
correctly translated to an
* operator.