This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new bf16a7d [FLINK-20079][task] Initialize operator chain before upstream partition request bf16a7d is described below commit bf16a7dd2a2786847abd440c69ab8ade59853a1d Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Thu Oct 29 17:51:49 2020 +0100 [FLINK-20079][task] Initialize operator chain before upstream partition request --- .../flink/streaming/runtime/tasks/StreamTask.java | 37 ++++++++++------------ .../checkpointing/UnalignedCheckpointITCase.java | 11 +++++-- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index c33772e..0eef744 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -499,32 +499,29 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened actionExecutor.runThrowing(() -> { - // both the following operations are protected by the lock - // so that we avoid race conditions in the case that initializeState() - // registers a timer, that fires before the open() is called. - readRecoveredChannelState(); // WARN: should be done before operatorChain.initializeStateAndOpenOperators (see FLINK-19907) + + SequentialChannelStateReader reader = getEnvironment().getTaskStateManager().getSequentialChannelStateReader(); + reader.readOutputData(getEnvironment().getAllWriters(), !configuration.isGraphContainingLoops()); operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer()); - }); - isRunning = true; - } + channelIOExecutor.execute(() -> { + try { + reader.readInputData(getEnvironment().getAllInputGates()); + } catch (Exception e) { + asyncExceptionHandler.handleAsyncException("Unable to read channel state", e); + } + }); - private void readRecoveredChannelState() throws IOException, InterruptedException { - SequentialChannelStateReader reader = getEnvironment().getTaskStateManager().getSequentialChannelStateReader(); - reader.readOutputData(getEnvironment().getAllWriters(), !configuration.isGraphContainingLoops()); - channelIOExecutor.execute(() -> { - try { - reader.readInputData(getEnvironment().getAllInputGates()); - } catch (Exception e) { - asyncExceptionHandler.handleAsyncException("Unable to read channel state", e); + for (InputGate inputGate : getEnvironment().getAllInputGates()) { + inputGate + .getStateConsumedFuture() + .thenRun(() -> mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request partitions")); } + }); - for (InputGate inputGate : getEnvironment().getAllInputGates()) { - inputGate - .getStateConsumedFuture() - .thenRun(() -> mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request partitions")); - } + + isRunning = true; } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index 5a036db..239243c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -80,6 +80,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -211,7 +212,7 @@ public class UnalignedCheckpointITCase extends TestLogger { final LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf); env.enableCheckpointing(100); - env.getCheckpointConfig().setAlignmentTimeout(0); + env.getCheckpointConfig().setAlignmentTimeout(1); env.setParallelism(parallelism); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(EXPECTED_FAILURES, Time.milliseconds(100))); env.getCheckpointConfig().enableUnalignedCheckpoints(true); @@ -462,6 +463,7 @@ public class UnalignedCheckpointITCase extends TestLogger { private ListState<State> stateList; private State state; private final long minCheckpoints; + private Random random = new Random(); private VerifyingSink(long minCheckpoints) { this.minCheckpoints = minCheckpoints; @@ -470,6 +472,7 @@ public class UnalignedCheckpointITCase extends TestLogger { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); + random = new Random(); getRuntimeContext().addAccumulator(NUM_OUTPUTS, numOutputCounter); getRuntimeContext().addAccumulator(NUM_OUT_OF_ORDER, outOfOrderCounter); getRuntimeContext().addAccumulator(NUM_DUPLICATES, duplicatesCounter); @@ -531,8 +534,10 @@ public class UnalignedCheckpointITCase extends TestLogger { state.numOutput++; if (state.completedCheckpoints < minCheckpoints) { - // induce heavy backpressure until enough checkpoints have been written - Thread.sleep(0, 100_000); + // induce backpressure until enough checkpoints have been written + if (random.nextInt(1000) == 42) { + Thread.sleep(1); + } } // after all checkpoints have been completed, the remaining data should be flushed out fairly quickly }