This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 58a849bcc49 [FLINK-31996] Chaining operators with different max 
parallelism prevents rescaling
58a849bcc49 is described below

commit 58a849bcc497e17bf575e49946f106030f3e1a1a
Author: David Moravek <d...@apache.org>
AuthorDate: Thu Jul 6 15:43:09 2023 +0200

    [FLINK-31996] Chaining operators with different max parallelism prevents 
rescaling
    
    * [FLINK-31996][flink-streaming-java] Introduce a new opt-in flag that can 
disable the chaining of operators with different max parallelism, which 
prevents rescaling in some cases when the AdaptiveScheduler is being used.
    
    - We introduce the new 
`pipeline.operator-chaining.chain-operators-with-different-max-parallelism` 
flag to opt-in.
    - We deprecate the `pipeline.operator-chaining` flag and surpass it with 
`pipeline.operator-chaining.enabled` to provide a more consistent configuration 
experience.
    - The StreamingJobGraphGenerator and StreamGraphHasherV2 respect the new 
flag.
---
 .../generated/pipeline_configuration.html          |  8 +++++-
 .../flink/configuration/PipelineOptions.java       | 11 +++++++-
 .../datastream/stream_execution_environment.py     |  9 ++++++
 .../environment/StreamExecutionEnvironment.java    | 16 ++++++++++-
 .../flink/streaming/api/graph/StreamGraph.java     | 11 ++++++++
 .../streaming/api/graph/StreamGraphGenerator.java  | 11 ++++++++
 .../api/graph/StreamingJobGraphGenerator.java      | 15 +++++++---
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 33 ++++++++++++++++++++++
 .../scala/StreamingScalaAPICompletenessTest.scala  |  2 ++
 9 files changed, 109 insertions(+), 7 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/pipeline_configuration.html 
b/docs/layouts/shortcodes/generated/pipeline_configuration.html
index f2b8b660f5a..653a44e3dbe 100644
--- a/docs/layouts/shortcodes/generated/pipeline_configuration.html
+++ b/docs/layouts/shortcodes/generated/pipeline_configuration.html
@@ -105,7 +105,13 @@
             <td>When enabled objects that Flink internally uses for 
deserialization and passing data to user-code functions will be reused. Keep in 
mind that this can lead to bugs when the user-code function of an operation is 
not aware of this behaviour.</td>
         </tr>
         <tr>
-            <td><h5>pipeline.operator-chaining</h5></td>
+            
<td><h5>pipeline.operator-chaining.chain-operators-with-different-max-parallelism</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Operators with different max parallelism can be chained 
together. Default behavior may prevent rescaling when the AdaptiveScheduler is 
used.</td>
+        </tr>
+        <tr>
+            <td><h5>pipeline.operator-chaining.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
             <td>Operator chaining allows non-shuffle operations to be 
co-located in the same thread fully avoiding serialization and 
de-serialization.</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index f8bacdf494b..0cf07b06edb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -243,13 +243,22 @@ public class PipelineOptions {
                                     .build());
 
     public static final ConfigOption<Boolean> OPERATOR_CHAINING =
-            key("pipeline.operator-chaining")
+            key("pipeline.operator-chaining.enabled")
                     .booleanType()
                     .defaultValue(true)
+                    .withDeprecatedKeys("pipeline.operator-chaining")
                     .withDescription(
                             "Operator chaining allows non-shuffle operations 
to be co-located in the same thread "
                                     + "fully avoiding serialization and 
de-serialization.");
 
+    public static final ConfigOption<Boolean>
+            OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM =
+                    
key("pipeline.operator-chaining.chain-operators-with-different-max-parallelism")
+                            .booleanType()
+                            .defaultValue(true)
+                            .withDescription(
+                                    "Operators with different max parallelism 
can be chained together. Default behavior may prevent rescaling when the 
AdaptiveScheduler is used.");
+
     public static final ConfigOption<List<String>> CACHED_FILES =
             key("pipeline.cached-files")
                     .stringType()
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py 
b/flink-python/pyflink/datastream/stream_execution_environment.py
index ad78ad280ce..1c3c3f43e8b 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -213,6 +213,15 @@ class StreamExecutionEnvironment(object):
         """
         return self._j_stream_execution_environment.isChainingEnabled()
 
+    def is_chaining_of_operators_with_different_max_parallelism_enabled(self) 
-> bool:
+        """
+        Returns whether operators that have a different max parallelism can be 
chained.
+
+        :return: True if chaining is enabled, false otherwise
+        """
+        return self._j_stream_execution_environment\
+            .isChainingOfOperatorsWithDifferentMaxParallelismEnabled()
+
     def get_checkpoint_config(self) -> CheckpointConfig:
         """
         Gets the checkpoint config, which defines values like checkpoint 
interval, delay between
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5e168e9a8c5..b9cf3eb150c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -194,7 +194,9 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
 
     private long bufferTimeout = 
ExecutionOptions.BUFFER_TIMEOUT.defaultValue().toMillis();
 
-    protected boolean isChainingEnabled = true;
+    private boolean isChainingEnabled = true;
+
+    private boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled = 
true;
 
     /** The state backend used for storing k/v state and state snapshots. */
     private StateBackend defaultStateBackend;
@@ -475,6 +477,11 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         return isChainingEnabled;
     }
 
+    @PublicEvolving
+    public boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled() {
+        return isChainingOfOperatorsWithDifferentMaxParallelismEnabled;
+    }
+
     // ------------------------------------------------------------------------
     //  Checkpointing Settings
     // ------------------------------------------------------------------------
@@ -990,6 +997,11 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         configuration
                 .getOptional(PipelineOptions.OPERATOR_CHAINING)
                 .ifPresent(c -> this.isChainingEnabled = c);
+        configuration
+                .getOptional(
+                        PipelineOptions
+                                
.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM)
+                .ifPresent(c -> 
this.isChainingOfOperatorsWithDifferentMaxParallelismEnabled = c);
         configuration
                 .getOptional(DeploymentOptions.JOB_LISTENERS)
                 .ifPresent(listeners -> registerCustomListeners(classLoader, 
listeners));
@@ -2299,6 +2311,8 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
                 .setChangelogStateBackendEnabled(changelogStateBackendEnabled)
                 .setSavepointDir(defaultSavepointDirectory)
                 .setChaining(isChainingEnabled)
+                .setChainingOfOperatorsWithDifferentMaxParallelism(
+                        
isChainingOfOperatorsWithDifferentMaxParallelismEnabled)
                 .setUserArtifacts(cacheFile)
                 .setTimeCharacteristic(timeCharacteristic)
                 .setDefaultBufferTimeout(bufferTimeout)
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 3add906c578..8727df8777b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -103,6 +103,7 @@ public class StreamGraph implements Pipeline {
     private SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.none();
 
     private boolean chaining;
+    private boolean chainingOfOperatorsWithDifferentMaxParallelism;
 
     private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> 
userArtifacts =
             Collections.emptyList();
@@ -196,6 +197,12 @@ public class StreamGraph implements Pipeline {
         this.chaining = chaining;
     }
 
+    public void setChainingOfOperatorsWithDifferentMaxParallelism(
+            boolean chainingOfOperatorsWithDifferentMaxParallelism) {
+        this.chainingOfOperatorsWithDifferentMaxParallelism =
+                chainingOfOperatorsWithDifferentMaxParallelism;
+    }
+
     public void setStateBackend(StateBackend backend) {
         this.stateBackend = backend;
     }
@@ -310,6 +317,10 @@ public class StreamGraph implements Pipeline {
         return chaining;
     }
 
+    public boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled() {
+        return chainingOfOperatorsWithDifferentMaxParallelism;
+    }
+
     public boolean isIterative() {
         return !vertexIDtoLoopTimeout.isEmpty();
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 3859c2bb9b2..26a2ef6ede3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -170,6 +170,8 @@ public class StreamGraphGenerator {
 
     private boolean chaining = true;
 
+    private boolean chainingOfOperatorsWithDifferentMaxParallelism = true;
+
     private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> 
userArtifacts =
             Collections.emptyList();
 
@@ -269,6 +271,13 @@ public class StreamGraphGenerator {
         return this;
     }
 
+    public StreamGraphGenerator 
setChainingOfOperatorsWithDifferentMaxParallelism(
+            boolean chainingOfOperatorsWithDifferentMaxParallelism) {
+        this.chainingOfOperatorsWithDifferentMaxParallelism =
+                chainingOfOperatorsWithDifferentMaxParallelism;
+        return this;
+    }
+
     public StreamGraphGenerator setUserArtifacts(
             Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> 
userArtifacts) {
         this.userArtifacts = checkNotNull(userArtifacts);
@@ -360,6 +369,8 @@ public class StreamGraphGenerator {
         checkNotNull(graph);
 
         graph.setChaining(chaining);
+        graph.setChainingOfOperatorsWithDifferentMaxParallelism(
+                chainingOfOperatorsWithDifferentMaxParallelism);
         graph.setUserArtifacts(userArtifacts);
         graph.setTimeCharacteristic(timeCharacteristic);
         
graph.setVertexDescriptionMode(configuration.get(PipelineOptions.VERTEX_DESCRIPTION_MODE));
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 3f818b6bd9a..bd88fb89954 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1512,12 +1512,11 @@ public class StreamingJobGraphGenerator {
         StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
         StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
 
-        if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
+        if (!(streamGraph.isChainingEnabled()
+                && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                 && areOperatorsChainable(upStreamVertex, downStreamVertex, 
streamGraph)
                 && arePartitionerAndExchangeModeChainable(
-                        edge.getPartitioner(), edge.getExchangeMode(), 
streamGraph.isDynamic())
-                && upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism()
-                && streamGraph.isChainingEnabled())) {
+                        edge.getPartitioner(), edge.getExchangeMode(), 
streamGraph.isDynamic()))) {
 
             return false;
         }
@@ -1601,6 +1600,14 @@ public class StreamingJobGraphGenerator {
                         "Unknown chaining strategy: " + 
downStreamOperator.getChainingStrategy());
         }
 
+        // Only vertices with the same parallelism can be chained.
+        isChainable &= upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism();
+
+        if 
(!streamGraph.isChainingOfOperatorsWithDifferentMaxParallelismEnabled()) {
+            isChainable &=
+                    upStreamVertex.getMaxParallelism() == 
downStreamVertex.getMaxParallelism();
+        }
+
         return isChainable;
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 788cc84e1ca..eeca72d60c8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -1180,6 +1180,39 @@ class StreamingJobGraphGeneratorTest {
         assertThat(vertices.get(1).getOperatorIDs()).hasSize(5);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testChainingOfOperatorsWithDifferentMaxParallelism(
+            boolean chainingOfOperatorsWithDifferentMaxParallelismEnabled) 
throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.set(
+                
PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM,
+                chainingOfOperatorsWithDifferentMaxParallelismEnabled);
+        configuration.set(PipelineOptions.MAX_PARALLELISM, 10);
+        try (StreamExecutionEnvironment chainEnv =
+                StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration)) {
+            chainEnv.fromElements(1)
+                    .map(x -> x)
+                    // should automatically break chain here
+                    .map(x -> x)
+                    .setMaxParallelism(1)
+                    .map(x -> x);
+
+            final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
+
+            final List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+            if (chainingOfOperatorsWithDifferentMaxParallelismEnabled) {
+                assertThat(vertices).hasSize(1);
+                assertThat(vertices.get(0).getOperatorIDs()).hasSize(4);
+            } else {
+                assertThat(vertices).hasSize(3);
+                assertThat(vertices.get(0).getOperatorIDs()).hasSize(2);
+                assertThat(vertices.get(1).getOperatorIDs()).hasSize(1);
+                assertThat(vertices.get(1).getOperatorIDs()).hasSize(1);
+            }
+        }
+    }
+
     /**
      * Tests that {@link 
org.apache.flink.streaming.api.operators.YieldingOperatorFactory} are
      * chained to new sources, see FLINK-20444.
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 22f12738a0a..fd1a2a607ca 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -53,6 +53,8 @@ class StreamingScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
       
"org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType",
       "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector",
       
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isChainingEnabled",
+      "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment." 
+
+        "isChainingOfOperatorsWithDifferentMaxParallelismEnabled",
       "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment." 
+
         "getStateHandleProvider",
       
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getCheckpointInterval",

Reply via email to