This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bf16a7d  [FLINK-20079][task] Initialize operator chain before upstream 
partition request
bf16a7d is described below

commit bf16a7dd2a2786847abd440c69ab8ade59853a1d
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Thu Oct 29 17:51:49 2020 +0100

    [FLINK-20079][task] Initialize operator chain before upstream partition 
request
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 37 ++++++++++------------
 .../checkpointing/UnalignedCheckpointITCase.java   | 11 +++++--
 2 files changed, 25 insertions(+), 23 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index c33772e..0eef744 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -499,32 +499,29 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                // we need to make sure that any triggers scheduled in open() 
cannot be
                // executed before all operators are opened
                actionExecutor.runThrowing(() -> {
-                       // both the following operations are protected by the 
lock
-                       // so that we avoid race conditions in the case that 
initializeState()
-                       // registers a timer, that fires before the open() is 
called.
-                       readRecoveredChannelState(); // WARN: should be done 
before operatorChain.initializeStateAndOpenOperators (see FLINK-19907)
+
+                       SequentialChannelStateReader reader = 
getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
+                       reader.readOutputData(getEnvironment().getAllWriters(), 
!configuration.isGraphContainingLoops());
 
                        
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
-               });
 
-               isRunning = true;
-       }
+                       channelIOExecutor.execute(() -> {
+                               try {
+                                       
reader.readInputData(getEnvironment().getAllInputGates());
+                               } catch (Exception e) {
+                                       
asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
+                               }
+                       });
 
-       private void readRecoveredChannelState() throws IOException, 
InterruptedException {
-               SequentialChannelStateReader reader = 
getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
-               reader.readOutputData(getEnvironment().getAllWriters(), 
!configuration.isGraphContainingLoops());
-               channelIOExecutor.execute(() -> {
-                       try {
-                               
reader.readInputData(getEnvironment().getAllInputGates());
-                       } catch (Exception e) {
-                               
asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
+                       for (InputGate inputGate : 
getEnvironment().getAllInputGates()) {
+                               inputGate
+                                       .getStateConsumedFuture()
+                                       .thenRun(() -> 
mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request 
partitions"));
                        }
+
                });
-               for (InputGate inputGate : getEnvironment().getAllInputGates()) 
{
-                       inputGate
-                               .getStateConsumedFuture()
-                               .thenRun(() -> 
mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request 
partitions"));
-               }
+
+               isRunning = true;
        }
 
        @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 5a036db..239243c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -80,6 +80,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -211,7 +212,7 @@ public class UnalignedCheckpointITCase extends TestLogger {
 
                final LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf);
                env.enableCheckpointing(100);
-               env.getCheckpointConfig().setAlignmentTimeout(0);
+               env.getCheckpointConfig().setAlignmentTimeout(1);
                env.setParallelism(parallelism);
                
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(EXPECTED_FAILURES, 
Time.milliseconds(100)));
                env.getCheckpointConfig().enableUnalignedCheckpoints(true);
@@ -462,6 +463,7 @@ public class UnalignedCheckpointITCase extends TestLogger {
                private ListState<State> stateList;
                private State state;
                private final long minCheckpoints;
+               private Random random = new Random();
 
                private VerifyingSink(long minCheckpoints) {
                        this.minCheckpoints = minCheckpoints;
@@ -470,6 +472,7 @@ public class UnalignedCheckpointITCase extends TestLogger {
                @Override
                public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
+                       random = new Random();
                        getRuntimeContext().addAccumulator(NUM_OUTPUTS, 
numOutputCounter);
                        getRuntimeContext().addAccumulator(NUM_OUT_OF_ORDER, 
outOfOrderCounter);
                        getRuntimeContext().addAccumulator(NUM_DUPLICATES, 
duplicatesCounter);
@@ -531,8 +534,10 @@ public class UnalignedCheckpointITCase extends TestLogger {
                        state.numOutput++;
 
                        if (state.completedCheckpoints < minCheckpoints) {
-                               // induce heavy backpressure until enough 
checkpoints have been written
-                               Thread.sleep(0, 100_000);
+                               // induce backpressure until enough checkpoints 
have been written
+                               if (random.nextInt(1000) == 42) {
+                                       Thread.sleep(1);
+                               }
                        }
                        // after all checkpoints have been completed, the 
remaining data should be flushed out fairly quickly
                }

Reply via email to