This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit cd54b584371b67e67015d104d34c2f19a446e2b3 Author: Robert Metzger <rmetz...@apache.org> AuthorDate: Mon Jun 21 22:21:21 2021 +0200 [FLINK-22464][tests] Fix OperatorCoordinator test which is stalling or slow with AdaptiveScheduler This closes #16229 --- .../OperatorEventSendingCheckpointITCase.java | 70 ++++++++++++++++++++-- 1 file changed, 64 insertions(+), 6 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java index 8d460f2..f6bf272 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java @@ -20,12 +20,17 @@ package org.apache.flink.runtime.operators.coordination; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.lib.NumberSequenceSource; import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -46,7 +51,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayDecoratorBase; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.TriFunction; @@ -55,7 +59,6 @@ import akka.actor.ActorSystem; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.experimental.categories.Category; import javax.annotation.Nullable; @@ -85,7 +88,10 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { @BeforeClass public static void setupMiniClusterAndEnv() throws Exception { - flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM); + Configuration config = new Configuration(); + // uncomment to run test with adaptive scheduler + // config.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); + flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM, config); flinkCluster.start(); TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM); } @@ -126,7 +132,6 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { * additionally a failure on the reader that triggers recovery. */ @Test - @Category(FailsWithAdaptiveScheduler.class) // FLINK-22464 public void testOperatorEventLostWithReaderFailure() throws Exception { final int[] eventsToLose = new int[] {1, 3}; @@ -205,6 +210,22 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { env.setParallelism(1); env.enableCheckpointing(50); + // This test depends on checkpoints persisting progress from the source before the + // artificial exception gets triggered. Otherwise, the job will run for a long time (or + // forever) because the exception will be thrown before any checkpoint successfully + // completes. + // + // Checkpoints are triggered once the checkpoint scheduler gets started + a random initial + // delay. For DefaultScheduler, this mechanism is fine, because DS starts the checkpoint + // coordinator, then requests the required slots and then deploys the tasks. These + // operations take enough time to have a checkpoint triggered by the time the task starts + // running. AdaptiveScheduler starts the CheckpointCoordinator right before deploying tasks + // (when slots are available already), hence tasks will start running almost immediately, + // and the checkpoint gets triggered too late (it won't be able to complete before the + // artificial failure from this test) + // Therefore, the TestingNumberSequenceSource waits for a checkpoint before emitting all + // required messages. + final DataStream<Long> numbers = env.fromSource( new TestingNumberSequenceSource(1L, numElements, 3), @@ -257,7 +278,6 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { private static final class AssignAfterCheckpointEnumerator< SplitT extends IteratorSourceSplit<?, ?>> extends IteratorSourceEnumerator<SplitT> { - private final Queue<Integer> pendingRequests = new ArrayDeque<>(); private final SplitEnumeratorContext<?> context; @@ -293,10 +313,12 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { private static final long serialVersionUID = 1L; private final int numSplits; + private final long numAllowedMessageBeforeCheckpoint; public TestingNumberSequenceSource(long from, long to, int numSplits) { super(from, to); this.numSplits = numSplits; + this.numAllowedMessageBeforeCheckpoint = (to - from) / numSplits; } @Override @@ -306,6 +328,40 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { splitNumberRange(getFrom(), getTo(), numSplits); return new AssignAfterCheckpointEnumerator<>(enumContext, splits); } + + @Override + public SourceReader<Long, NumberSequenceSplit> createReader( + SourceReaderContext readerContext) { + return new CheckpointListeningIteratorSourceReader( + readerContext, numAllowedMessageBeforeCheckpoint); + } + } + + private static class CheckpointListeningIteratorSourceReader extends IteratorSourceReader { + private boolean checkpointed = false; + private long messagesProduced = 0; + private final long numAllowedMessageBeforeCheckpoint; + + public CheckpointListeningIteratorSourceReader( + SourceReaderContext context, long waitForCheckpointAfterMessages) { + super(context); + this.numAllowedMessageBeforeCheckpoint = waitForCheckpointAfterMessages; + } + + @Override + public InputStatus pollNext(ReaderOutput output) { + if (messagesProduced < numAllowedMessageBeforeCheckpoint || checkpointed) { + messagesProduced++; + return super.pollNext(output); + } else { + return InputStatus.NOTHING_AVAILABLE; + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + checkpointed = true; + } } // ------------------------------------------------------------------------ @@ -421,11 +477,13 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { private boolean localRpcCreated; - public MiniClusterWithRpcIntercepting(final int numSlots) { + public MiniClusterWithRpcIntercepting( + final int numSlots, final Configuration configuration) { super( new MiniClusterConfiguration.Builder() .setRpcServiceSharing(RpcServiceSharing.SHARED) .setNumTaskManagers(1) + .setConfiguration(configuration) .setNumSlotsPerTaskManager(numSlots) .build()); }