This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be972ef13157783ab6ff1b6282e34cfd5f1e1d61
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Nov 27 11:43:04 2020 +0100

    [FLINK-20391] Set FORWARD_EDGES_PIPELINED for BATCH ExecutionMode
    
    This closes #14249
---
 .../streaming/api/graph/StreamGraphGenerator.java  |  2 +-
 ...amGraphGeneratorExecutionModeDetectionTest.java | 18 ++++++--
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 49 ++++++++++++++++++++++
 .../datastream/DataStreamBatchExecutionITCase.java | 32 +++++++++++++-
 4 files changed, 96 insertions(+), 5 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 0d64872..6aca57b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -284,7 +284,7 @@ public class StreamGraphGenerator {
                                checkpointConfig.disableCheckpointing();
                        }
 
-                       
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
+                       
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED);
                        
graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
                        setDefaultBufferTimeout(-1);
                        setBatchStateBackendAndTimerService(graph);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java
index 582a82a..65fb375 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java
@@ -126,7 +126,7 @@ public class StreamGraphGeneratorExecutionModeDetectionTest 
extends TestLogger {
                assertThat(
                                streamGraph,
                                hasProperties(
-                                               
GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED,
+                                               
GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED,
                                                
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
                                                false));
        }
@@ -189,7 +189,7 @@ public class StreamGraphGeneratorExecutionModeDetectionTest 
extends TestLogger {
                assertThat(
                                graph,
                                hasProperties(
-                                               
GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED,
+                                               
GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED,
                                                
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
                                                false));
        }
@@ -238,7 +238,7 @@ public class StreamGraphGeneratorExecutionModeDetectionTest 
extends TestLogger {
                assertThat(
                                graph,
                                hasProperties(
-                                               
GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED,
+                                               
GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED,
                                                
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
                                                false));
 
@@ -299,6 +299,18 @@ public class 
StreamGraphGeneratorExecutionModeDetectionTest extends TestLogger {
                                                .appendValue(scheduleMode)
                                                .appendText("'");
                        }
+
+                       @Override
+                       protected void describeMismatchSafely(
+                                       StreamGraph item,
+                                       Description mismatchDescription) {
+                               mismatchDescription.appendText("was ")
+                                       .appendText("a StreamGraph with 
exchangeMode='")
+                                       
.appendValue(item.getGlobalDataExchangeMode())
+                                       .appendText("' and scheduleMode='")
+                                       .appendValue(item.getScheduleMode())
+                                       .appendText("'");
+                       }
                };
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index ca4cc34..5239c84 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -93,6 +94,8 @@ import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -108,6 +111,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.areOperatorsChainable;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -613,6 +617,51 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                        
sourceAndMapVertex.getProducedDataSets().get(0).getResultType());
        }
 
+       @Test
+       public void testPartitionTypesInBatchMode() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+               env.setParallelism(4);
+               env.disableOperatorChaining();
+               DataStream<Integer> source = env.fromElements(1);
+               source
+                       // set the same parallelism as the source to make it a 
FORWARD SHUFFLE
+                       .map(value -> value).setParallelism(1)
+                       .rescale()
+                       .map(value -> value)
+                       .rebalance()
+                       .map(value -> value)
+                       .keyBy(value -> value)
+                       .map(value -> value)
+                       .addSink(new DiscardingSink<>());
+
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+               List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               assertThat(verticesSorted.get(0) /* source - forward */,
+                       
hasOutputPartitionType(ResultPartitionType.PIPELINED_BOUNDED));
+               assertThat(verticesSorted.get(1) /* rescale */,
+                       hasOutputPartitionType(ResultPartitionType.BLOCKING));
+               assertThat(verticesSorted.get(2) /* rebalance */,
+                       hasOutputPartitionType(ResultPartitionType.BLOCKING));
+               assertThat(verticesSorted.get(3) /* keyBy */,
+                       hasOutputPartitionType(ResultPartitionType.BLOCKING));
+               assertThat(verticesSorted.get(4) /* forward - sink */,
+                       
hasOutputPartitionType(ResultPartitionType.PIPELINED_BOUNDED));
+       }
+
+       private Matcher<JobVertex> hasOutputPartitionType(ResultPartitionType 
partitionType) {
+               return new FeatureMatcher<JobVertex, ResultPartitionType>(
+                       equalTo(partitionType),
+                       "output partition type",
+                       "output partition type"
+               ) {
+                       @Override
+                       protected ResultPartitionType featureValueOf(JobVertex 
actual) {
+                               return 
actual.getProducedDataSets().get(0).getResultType();
+                       }
+               };
+       }
+
        @Test(expected = UnsupportedOperationException.class)
        public void testConflictShuffleModeWithBufferTimeout() {
                testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.BATCH);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java
index dd8c674..8d229df 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java
@@ -109,7 +109,7 @@ public class DataStreamBatchExecutionITCase {
 
                try (CloseableIterator<String> result = 
mapped.executeAndCollect()) {
 
-                       // only the operators after the key-by "barrier" are 
restarted and will have the
+                       // only the operators after the rebalance "barrier" are 
restarted and will have the
                        // "attempt 1" suffix
                        assertThat(
                                        iteratorToList(result),
@@ -117,6 +117,36 @@ public class DataStreamBatchExecutionITCase {
                }
        }
 
+       /**
+        * We induce a failure in the last mapper. In BATCH execution mode the 
part of the pipeline
+        * before the rescale should not be re-executed. Only the part after 
that will restart. We
+        * check that by suffixing the attempt number to records and asserting 
the correct number.
+        */
+       @Test
+       public void batchFailoverWithRescaleBarrier() throws Exception {
+
+               final StreamExecutionEnvironment env = 
getExecutionEnvironment();
+
+               DataStreamSource<String> source = env.fromElements("foo", 
"bar");
+               env.setParallelism(1);
+
+               SingleOutputStreamOperator<String> mapped = source
+                       .map(new SuffixAttemptId("a"))
+                       .map(new SuffixAttemptId("b"))
+                       .rescale()
+                       .map(new SuffixAttemptId("c")).setParallelism(2)
+                       .map(new OnceFailingMapper("d")).setParallelism(2);
+
+               try (CloseableIterator<String> result = 
mapped.executeAndCollect()) {
+
+                       // only the operators after the rescale "barrier" are 
restarted and will have the
+                       // "attempt 1" suffix
+                       assertThat(
+                               iteratorToList(result),
+                               containsInAnyOrder("foo-a0-b0-c1-d1", 
"bar-a0-b0-c1-d1"));
+               }
+       }
+
        @Test
        public void batchReduceSingleResultPerKey() throws Exception {
                StreamExecutionEnvironment env = getExecutionEnvironment();

Reply via email to