This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 98000fbe598 [FLINK-37623][datastream] Async state support for
process() in Datastream API (#26419)
98000fbe598 is described below
commit 98000fbe598ca20eedf6411672509af9624a58ac
Author: Yanfei Lei <[email protected]>
AuthorDate: Fri Apr 11 10:21:51 2025 +0800
[FLINK-37623][datastream] Async state support for process() in Datastream
API (#26419)
---
.../streaming/api/datastream/KeyedStream.java | 12 +++---
.../apache/flink/streaming/api/DataStreamTest.java | 44 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index f082f11b8e6..e99f390276e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -35,6 +35,7 @@ import
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import
org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncStreamFlatMap;
import
org.apache.flink.runtime.asyncprocessing.operators.co.AsyncIntervalJoinOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -358,9 +359,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
@Internal
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType) {
-
- KeyedProcessOperator<KEY, T, R> operator =
- new KeyedProcessOperator<>(clean(keyedProcessFunction));
+ OneInputStreamOperator<T, R> operator =
+ isEnableAsyncState()
+ ? new
AsyncKeyedProcessOperator<>(clean(keyedProcessFunction))
+ : new
KeyedProcessOperator<>(clean(keyedProcessFunction));
return transform("KeyedProcess", outputType, operator);
}
@@ -370,9 +372,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
@Override
public <R> SingleOutputStreamOperator<R> flatMap(
FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
- OneInputStreamOperator operator =
+ OneInputStreamOperator<T, R> operator =
isEnableAsyncState()
- ? new AsyncStreamFlatMap(clean(flatMapper))
+ ? new AsyncStreamFlatMap<>(clean(flatMapper))
: new StreamFlatMap<>(clean(flatMapper));
return transform("Flat Map", outputType, operator);
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index c02464b9557..747cec98067 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -40,6 +40,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
+import
org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
@@ -934,6 +936,48 @@ class DataStreamTest {
assertThat(getOperatorForDataStream(processed)).isInstanceOf(KeyedProcessOperator.class);
}
+ /**
+ * Verify that a {@link KeyedStream#process(KeyedProcessFunction)} call is
correctly translated
+ * to an async operator.
+ */
+ @Test
+ void testAsyncKeyedStreamKeyedProcessTranslation() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Long> src = env.fromSequence(0, 0);
+
+ KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction =
+ new KeyedProcessFunction<Long, Long, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(Long value, Context ctx,
Collector<Integer> out)
+ throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Integer> out)
+ throws Exception {
+ // Do nothing
+ }
+ };
+
+ DataStream<Integer> processed =
+ src.keyBy(new IdentityKeySelector<Long>())
+ .enableAsyncState()
+ .process(keyedProcessFunction);
+
+ processed.sinkTo(new DiscardingSink<Integer>());
+
+ assertThat(
+ ((AbstractAsyncStateUdfStreamOperator<?, ?>)
+ getOperatorForDataStream(processed))
+ .getUserFunction())
+ .isEqualTo(keyedProcessFunction);
+ assertThat(getOperatorForDataStream(processed))
+ .isInstanceOf(AsyncKeyedProcessOperator.class);
+ }
+
/**
* Verify that a {@link DataStream#process(ProcessFunction)} call is
correctly translated to an
* operator.