This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit be972ef13157783ab6ff1b6282e34cfd5f1e1d61 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri Nov 27 11:43:04 2020 +0100 [FLINK-20391] Set FORWARD_EDGES_PIPELINED for BATCH ExecutionMode This closes #14249 --- .../streaming/api/graph/StreamGraphGenerator.java | 2 +- ...amGraphGeneratorExecutionModeDetectionTest.java | 18 ++++++-- .../api/graph/StreamingJobGraphGeneratorTest.java | 49 ++++++++++++++++++++++ .../datastream/DataStreamBatchExecutionITCase.java | 32 +++++++++++++- 4 files changed, 96 insertions(+), 5 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 0d64872..6aca57b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -284,7 +284,7 @@ public class StreamGraphGenerator { checkpointConfig.disableCheckpointing(); } - graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED); + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED); graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST); setDefaultBufferTimeout(-1); setBatchStateBackendAndTimerService(graph); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java index 582a82a..65fb375 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java @@ -126,7 +126,7 @@ public class StreamGraphGeneratorExecutionModeDetectionTest extends TestLogger { assertThat( streamGraph, hasProperties( - GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED, + GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED, ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST, false)); } @@ -189,7 +189,7 @@ public class StreamGraphGeneratorExecutionModeDetectionTest extends TestLogger { assertThat( graph, hasProperties( - GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED, + GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED, ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST, false)); } @@ -238,7 +238,7 @@ public class StreamGraphGeneratorExecutionModeDetectionTest extends TestLogger { assertThat( graph, hasProperties( - GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED, + GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED, ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST, false)); @@ -299,6 +299,18 @@ public class StreamGraphGeneratorExecutionModeDetectionTest extends TestLogger { .appendValue(scheduleMode) .appendText("'"); } + + @Override + protected void describeMismatchSafely( + StreamGraph item, + Description mismatchDescription) { + mismatchDescription.appendText("was ") + .appendText("a StreamGraph with exchangeMode='") + .appendValue(item.getGlobalDataExchangeMode()) + .appendText("' and scheduleMode='") + .appendValue(item.getScheduleMode()) + .appendText("'"); + } }; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index ca4cc34..5239c84 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -93,6 +94,8 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.hamcrest.FeatureMatcher; +import org.hamcrest.Matcher; import org.junit.Assert; import org.junit.Test; @@ -108,6 +111,7 @@ import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.areOperatorsChainable; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -613,6 +617,51 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { sourceAndMapVertex.getProducedDataSets().get(0).getResultType()); } + @Test + public void testPartitionTypesInBatchMode() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(4); + env.disableOperatorChaining(); + DataStream<Integer> source = env.fromElements(1); + source + // set the same parallelism as the source to make it a FORWARD SHUFFLE + .map(value -> value).setParallelism(1) + .rescale() + .map(value -> value) + .rebalance() + .map(value -> value) + .keyBy(value -> value) + .map(value -> value) + .addSink(new DiscardingSink<>()); + + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); + List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + assertThat(verticesSorted.get(0) /* source - forward */, + hasOutputPartitionType(ResultPartitionType.PIPELINED_BOUNDED)); + assertThat(verticesSorted.get(1) /* rescale */, + hasOutputPartitionType(ResultPartitionType.BLOCKING)); + assertThat(verticesSorted.get(2) /* rebalance */, + hasOutputPartitionType(ResultPartitionType.BLOCKING)); + assertThat(verticesSorted.get(3) /* keyBy */, + hasOutputPartitionType(ResultPartitionType.BLOCKING)); + assertThat(verticesSorted.get(4) /* forward - sink */, + hasOutputPartitionType(ResultPartitionType.PIPELINED_BOUNDED)); + } + + private Matcher<JobVertex> hasOutputPartitionType(ResultPartitionType partitionType) { + return new FeatureMatcher<JobVertex, ResultPartitionType>( + equalTo(partitionType), + "output partition type", + "output partition type" + ) { + @Override + protected ResultPartitionType featureValueOf(JobVertex actual) { + return actual.getProducedDataSets().get(0).getResultType(); + } + }; + } + @Test(expected = UnsupportedOperationException.class) public void testConflictShuffleModeWithBufferTimeout() { testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.BATCH); diff --git a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java index dd8c674..8d229df 100644 --- a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java @@ -109,7 +109,7 @@ public class DataStreamBatchExecutionITCase { try (CloseableIterator<String> result = mapped.executeAndCollect()) { - // only the operators after the key-by "barrier" are restarted and will have the + // only the operators after the rebalance "barrier" are restarted and will have the // "attempt 1" suffix assertThat( iteratorToList(result), @@ -117,6 +117,36 @@ public class DataStreamBatchExecutionITCase { } } + /** + * We induce a failure in the last mapper. In BATCH execution mode the part of the pipeline + * before the rescale should not be re-executed. Only the part after that will restart. We + * check that by suffixing the attempt number to records and asserting the correct number. + */ + @Test + public void batchFailoverWithRescaleBarrier() throws Exception { + + final StreamExecutionEnvironment env = getExecutionEnvironment(); + + DataStreamSource<String> source = env.fromElements("foo", "bar"); + env.setParallelism(1); + + SingleOutputStreamOperator<String> mapped = source + .map(new SuffixAttemptId("a")) + .map(new SuffixAttemptId("b")) + .rescale() + .map(new SuffixAttemptId("c")).setParallelism(2) + .map(new OnceFailingMapper("d")).setParallelism(2); + + try (CloseableIterator<String> result = mapped.executeAndCollect()) { + + // only the operators after the rescale "barrier" are restarted and will have the + // "attempt 1" suffix + assertThat( + iteratorToList(result), + containsInAnyOrder("foo-a0-b0-c1-d1", "bar-a0-b0-c1-d1")); + } + } + @Test public void batchReduceSingleResultPerKey() throws Exception { StreamExecutionEnvironment env = getExecutionEnvironment();