This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 932f17da6cb [FLINK-37458][datastream] Forbid enableAsyncState() for 
synchronous operators (#26283)
932f17da6cb is described below

commit 932f17da6cbc08699b70ae22768726ea97e7ba18
Author: Yanfei Lei <[email protected]>
AuthorDate: Wed Mar 12 19:58:06 2025 +0800

    [FLINK-37458][datastream] Forbid enableAsyncState() for synchronous 
operators (#26283)
---
 .../transformations/OneInputTransformation.java    |  8 +++++-
 .../api/graph/JobGraphGeneratorTestBase.java       | 29 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
index e8301e23ca8..b1bb7ee1b5c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.Lists;
 
@@ -195,6 +196,11 @@ public class OneInputTransformation<IN, OUT> extends 
PhysicalTransformation<OUT>
 
     @Override
     public void enableAsyncState() {
-        // nothing to do.
+        OneInputStreamOperator<IN, OUT> operator =
+                (OneInputStreamOperator<IN, OUT>)
+                        ((SimpleOperatorFactory<OUT>) 
operatorFactory).getOperator();
+        if (!(operator instanceof AsyncStateProcessingOperator)) {
+            super.enableAsyncState();
+        }
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
index f9de7c5a20e..dd29081dd6e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.io.InputFormat;
@@ -119,6 +120,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
@@ -155,6 +157,7 @@ import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.ar
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Tests for {@link StreamingJobGraphGenerator} and {@link 
AdaptiveGraphManager}.
@@ -2163,6 +2166,32 @@ abstract class JobGraphGeneratorTestBase {
                 new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), 
true);
     }
 
+    @Test
+    void testEnableAsyncStateForSyncOperatorThrowException() throws Exception {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+        try {
+            env.fromData(1, 2, 3, 4, 5)
+                    .keyBy(k -> k)
+                    .flatMap(
+                            new FlatMapFunction<Integer, Integer>() {
+                                @Override
+                                public void flatMap(Integer value, 
Collector<Integer> out)
+                                        throws Exception {
+                                    out.collect(value);
+                                }
+                            })
+                    .enableAsyncState()
+                    .print();
+            fail("Enabling async state for synchronous operators is 
forbidden.");
+        } catch (UnsupportedOperationException e) {
+            assertThat(e.getMessage())
+                    .isEqualTo(
+                            "The transformation does not support "
+                                    + "async state, or you are enabling the 
async state without a keyed context (not behind a keyBy()).");
+        }
+    }
+
     private void testWhetherOutputFormatSupportsConcurrentExecutionAttempts(
             OutputFormat<Integer> outputFormat, boolean isSupported) {
         final StreamExecutionEnvironment env =

Reply via email to