This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 58a849bcc49 [FLINK-31996] Chaining operators with different max parallelism prevents rescaling 58a849bcc49 is described below commit 58a849bcc497e17bf575e49946f106030f3e1a1a Author: David Moravek <d...@apache.org> AuthorDate: Thu Jul 6 15:43:09 2023 +0200 [FLINK-31996] Chaining operators with different max parallelism prevents rescaling * [FLINK-31996][flink-streaming-java] Introduce a new opt-in flag that can disable the chaining of operators with different max parallelism, which prevents rescaling in some cases when the AdaptiveScheduler is being used. - We introduce the new `pipeline.operator-chaining.chain-operators-with-different-max-parallelism` flag to opt-in. - We deprecate the `pipeline.operator-chaining` flag and surpass it with `pipeline.operator-chaining.enabled` to provide a more consistent configuration experience. - The StreamingJobGraphGenerator and StreamGraphHasherV2 respect the new flag. --- .../generated/pipeline_configuration.html | 8 +++++- .../flink/configuration/PipelineOptions.java | 11 +++++++- .../datastream/stream_execution_environment.py | 9 ++++++ .../environment/StreamExecutionEnvironment.java | 16 ++++++++++- .../flink/streaming/api/graph/StreamGraph.java | 11 ++++++++ .../streaming/api/graph/StreamGraphGenerator.java | 11 ++++++++ .../api/graph/StreamingJobGraphGenerator.java | 15 +++++++--- .../api/graph/StreamingJobGraphGeneratorTest.java | 33 ++++++++++++++++++++++ .../scala/StreamingScalaAPICompletenessTest.scala | 2 ++ 9 files changed, 109 insertions(+), 7 deletions(-) diff --git a/docs/layouts/shortcodes/generated/pipeline_configuration.html b/docs/layouts/shortcodes/generated/pipeline_configuration.html index f2b8b660f5a..653a44e3dbe 100644 --- a/docs/layouts/shortcodes/generated/pipeline_configuration.html +++ b/docs/layouts/shortcodes/generated/pipeline_configuration.html @@ -105,7 +105,13 @@ <td>When enabled objects that Flink internally uses for deserialization and passing data to user-code functions will be reused. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behaviour.</td> </tr> <tr> - <td><h5>pipeline.operator-chaining</h5></td> + <td><h5>pipeline.operator-chaining.chain-operators-with-different-max-parallelism</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Operators with different max parallelism can be chained together. Default behavior may prevent rescaling when the AdaptiveScheduler is used.</td> + </tr> + <tr> + <td><h5>pipeline.operator-chaining.enabled</h5></td> <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td>Operator chaining allows non-shuffle operations to be co-located in the same thread fully avoiding serialization and de-serialization.</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java index f8bacdf494b..0cf07b06edb 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java @@ -243,13 +243,22 @@ public class PipelineOptions { .build()); public static final ConfigOption<Boolean> OPERATOR_CHAINING = - key("pipeline.operator-chaining") + key("pipeline.operator-chaining.enabled") .booleanType() .defaultValue(true) + .withDeprecatedKeys("pipeline.operator-chaining") .withDescription( "Operator chaining allows non-shuffle operations to be co-located in the same thread " + "fully avoiding serialization and de-serialization."); + public static final ConfigOption<Boolean> + OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM = + key("pipeline.operator-chaining.chain-operators-with-different-max-parallelism") + .booleanType() + .defaultValue(true) + .withDescription( + "Operators with different max parallelism can be chained together. Default behavior may prevent rescaling when the AdaptiveScheduler is used."); + public static final ConfigOption<List<String>> CACHED_FILES = key("pipeline.cached-files") .stringType() diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index ad78ad280ce..1c3c3f43e8b 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -213,6 +213,15 @@ class StreamExecutionEnvironment(object): """ return self._j_stream_execution_environment.isChainingEnabled() + def is_chaining_of_operators_with_different_max_parallelism_enabled(self) -> bool: + """ + Returns whether operators that have a different max parallelism can be chained. + + :return: True if chaining is enabled, false otherwise + """ + return self._j_stream_execution_environment\ + .isChainingOfOperatorsWithDifferentMaxParallelismEnabled() + def get_checkpoint_config(self) -> CheckpointConfig: """ Gets the checkpoint config, which defines values like checkpoint interval, delay between diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 5e168e9a8c5..b9cf3eb150c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -194,7 +194,9 @@ public class StreamExecutionEnvironment implements AutoCloseable { private long bufferTimeout = ExecutionOptions.BUFFER_TIMEOUT.defaultValue().toMillis(); - protected boolean isChainingEnabled = true; + private boolean isChainingEnabled = true; + + private boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled = true; /** The state backend used for storing k/v state and state snapshots. */ private StateBackend defaultStateBackend; @@ -475,6 +477,11 @@ public class StreamExecutionEnvironment implements AutoCloseable { return isChainingEnabled; } + @PublicEvolving + public boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled() { + return isChainingOfOperatorsWithDifferentMaxParallelismEnabled; + } + // ------------------------------------------------------------------------ // Checkpointing Settings // ------------------------------------------------------------------------ @@ -990,6 +997,11 @@ public class StreamExecutionEnvironment implements AutoCloseable { configuration .getOptional(PipelineOptions.OPERATOR_CHAINING) .ifPresent(c -> this.isChainingEnabled = c); + configuration + .getOptional( + PipelineOptions + .OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM) + .ifPresent(c -> this.isChainingOfOperatorsWithDifferentMaxParallelismEnabled = c); configuration .getOptional(DeploymentOptions.JOB_LISTENERS) .ifPresent(listeners -> registerCustomListeners(classLoader, listeners)); @@ -2299,6 +2311,8 @@ public class StreamExecutionEnvironment implements AutoCloseable { .setChangelogStateBackendEnabled(changelogStateBackendEnabled) .setSavepointDir(defaultSavepointDirectory) .setChaining(isChainingEnabled) + .setChainingOfOperatorsWithDifferentMaxParallelism( + isChainingOfOperatorsWithDifferentMaxParallelismEnabled) .setUserArtifacts(cacheFile) .setTimeCharacteristic(timeCharacteristic) .setDefaultBufferTimeout(bufferTimeout) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 3add906c578..8727df8777b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -103,6 +103,7 @@ public class StreamGraph implements Pipeline { private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none(); private boolean chaining; + private boolean chainingOfOperatorsWithDifferentMaxParallelism; private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts = Collections.emptyList(); @@ -196,6 +197,12 @@ public class StreamGraph implements Pipeline { this.chaining = chaining; } + public void setChainingOfOperatorsWithDifferentMaxParallelism( + boolean chainingOfOperatorsWithDifferentMaxParallelism) { + this.chainingOfOperatorsWithDifferentMaxParallelism = + chainingOfOperatorsWithDifferentMaxParallelism; + } + public void setStateBackend(StateBackend backend) { this.stateBackend = backend; } @@ -310,6 +317,10 @@ public class StreamGraph implements Pipeline { return chaining; } + public boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled() { + return chainingOfOperatorsWithDifferentMaxParallelism; + } + public boolean isIterative() { return !vertexIDtoLoopTimeout.isEmpty(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 3859c2bb9b2..26a2ef6ede3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -170,6 +170,8 @@ public class StreamGraphGenerator { private boolean chaining = true; + private boolean chainingOfOperatorsWithDifferentMaxParallelism = true; + private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts = Collections.emptyList(); @@ -269,6 +271,13 @@ public class StreamGraphGenerator { return this; } + public StreamGraphGenerator setChainingOfOperatorsWithDifferentMaxParallelism( + boolean chainingOfOperatorsWithDifferentMaxParallelism) { + this.chainingOfOperatorsWithDifferentMaxParallelism = + chainingOfOperatorsWithDifferentMaxParallelism; + return this; + } + public StreamGraphGenerator setUserArtifacts( Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts) { this.userArtifacts = checkNotNull(userArtifacts); @@ -360,6 +369,8 @@ public class StreamGraphGenerator { checkNotNull(graph); graph.setChaining(chaining); + graph.setChainingOfOperatorsWithDifferentMaxParallelism( + chainingOfOperatorsWithDifferentMaxParallelism); graph.setUserArtifacts(userArtifacts); graph.setTimeCharacteristic(timeCharacteristic); graph.setVertexDescriptionMode(configuration.get(PipelineOptions.VERTEX_DESCRIPTION_MODE)); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 3f818b6bd9a..bd88fb89954 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1512,12 +1512,11 @@ public class StreamingJobGraphGenerator { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); - if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex) + if (!(streamGraph.isChainingEnabled() + && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) && arePartitionerAndExchangeModeChainable( - edge.getPartitioner(), edge.getExchangeMode(), streamGraph.isDynamic()) - && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() - && streamGraph.isChainingEnabled())) { + edge.getPartitioner(), edge.getExchangeMode(), streamGraph.isDynamic()))) { return false; } @@ -1601,6 +1600,14 @@ public class StreamingJobGraphGenerator { "Unknown chaining strategy: " + downStreamOperator.getChainingStrategy()); } + // Only vertices with the same parallelism can be chained. + isChainable &= upStreamVertex.getParallelism() == downStreamVertex.getParallelism(); + + if (!streamGraph.isChainingOfOperatorsWithDifferentMaxParallelismEnabled()) { + isChainable &= + upStreamVertex.getMaxParallelism() == downStreamVertex.getMaxParallelism(); + } + return isChainable; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 788cc84e1ca..eeca72d60c8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -1180,6 +1180,39 @@ class StreamingJobGraphGeneratorTest { assertThat(vertices.get(1).getOperatorIDs()).hasSize(5); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testChainingOfOperatorsWithDifferentMaxParallelism( + boolean chainingOfOperatorsWithDifferentMaxParallelismEnabled) throws Exception { + final Configuration configuration = new Configuration(); + configuration.set( + PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM, + chainingOfOperatorsWithDifferentMaxParallelismEnabled); + configuration.set(PipelineOptions.MAX_PARALLELISM, 10); + try (StreamExecutionEnvironment chainEnv = + StreamExecutionEnvironment.createLocalEnvironment(1, configuration)) { + chainEnv.fromElements(1) + .map(x -> x) + // should automatically break chain here + .map(x -> x) + .setMaxParallelism(1) + .map(x -> x); + + final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph(); + + final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + if (chainingOfOperatorsWithDifferentMaxParallelismEnabled) { + assertThat(vertices).hasSize(1); + assertThat(vertices.get(0).getOperatorIDs()).hasSize(4); + } else { + assertThat(vertices).hasSize(3); + assertThat(vertices.get(0).getOperatorIDs()).hasSize(2); + assertThat(vertices.get(1).getOperatorIDs()).hasSize(1); + assertThat(vertices.get(1).getOperatorIDs()).hasSize(1); + } + } + } + /** * Tests that {@link org.apache.flink.streaming.api.operators.YieldingOperatorFactory} are * chained to new sources, see FLINK-20444. diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index 22f12738a0a..fd1a2a607ca 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -53,6 +53,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType", "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector", "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isChainingEnabled", + "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment." + + "isChainingOfOperatorsWithDifferentMaxParallelismEnabled", "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment." + "getStateHandleProvider", "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getCheckpointInterval",