This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new 14e85eced10 [FLINK-37458][datastream] Forbid enableAsyncState() for
synchronous operators (#26395)
14e85eced10 is described below
commit 14e85eced10e98bc75870ac0360bad67d0722697
Author: Yanfei Lei <[email protected]>
AuthorDate: Tue Apr 8 12:13:58 2025 +0800
[FLINK-37458][datastream] Forbid enableAsyncState() for synchronous
operators (#26395)
---
.../transformations/OneInputTransformation.java | 8 +++++-
.../api/graph/JobGraphGeneratorTestBase.java | 29 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
index edae579c923..13a8ead51ee 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
@@ -27,6 +27,7 @@ import
org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
@@ -195,6 +196,11 @@ public class OneInputTransformation<IN, OUT> extends
PhysicalTransformation<OUT>
@Override
public void enableAsyncState() {
- // nothing to do.
+ OneInputStreamOperator<IN, OUT> operator =
+ (OneInputStreamOperator<IN, OUT>)
+ ((SimpleOperatorFactory<OUT>)
operatorFactory).getOperator();
+ if (!(operator instanceof AsyncStateProcessingOperator)) {
+ super.enableAsyncState();
+ }
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
index a29164c51b6..48bf37c409c 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.InputFormat;
@@ -119,6 +120,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
@@ -155,6 +157,7 @@ import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.ar
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
/**
* Tests for {@link StreamingJobGraphGenerator} and {@link
AdaptiveGraphManager}.
@@ -2163,6 +2166,32 @@ abstract class JobGraphGeneratorTestBase {
new TestingOutputFormatSupportConcurrentExecutionAttempts<>(),
true);
}
+ @Test
+ void testEnableAsyncStateForSyncOperatorThrowException() throws Exception {
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(new
Configuration());
+ try {
+ env.fromData(1, 2, 3, 4, 5)
+ .keyBy(k -> k)
+ .flatMap(
+ new FlatMapFunction<Integer, Integer>() {
+ @Override
+ public void flatMap(Integer value,
Collector<Integer> out)
+ throws Exception {
+ out.collect(value);
+ }
+ })
+ .enableAsyncState()
+ .print();
+ fail("Enabling async state for synchronous operators is
forbidden.");
+ } catch (UnsupportedOperationException e) {
+ assertThat(e.getMessage())
+ .isEqualTo(
+ "The transformation does not support "
+ + "async state, or you are enabling the
async state without a keyed context (not behind a keyBy()).");
+ }
+ }
+
private void testWhetherOutputFormatSupportsConcurrentExecutionAttempts(
OutputFormat<Integer> outputFormat, boolean isSupported) {
final StreamExecutionEnvironment env =