This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cd54b584371b67e67015d104d34c2f19a446e2b3
Author: Robert Metzger <rmetz...@apache.org>
AuthorDate: Mon Jun 21 22:21:21 2021 +0200

    [FLINK-22464][tests] Fix OperatorCoordinator test which is stalling or slow 
with AdaptiveScheduler
    
    This closes #16229
---
 .../OperatorEventSendingCheckpointITCase.java      | 70 ++++++++++++++++++++--
 1 file changed, 64 insertions(+), 6 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index 8d460f2..f6bf272 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -20,12 +20,17 @@ package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
 import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -46,7 +51,6 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayDecoratorBase;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.TriFunction;
@@ -55,7 +59,6 @@ import akka.actor.ActorSystem;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
 import javax.annotation.Nullable;
 
@@ -85,7 +88,10 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
 
     @BeforeClass
     public static void setupMiniClusterAndEnv() throws Exception {
-        flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM);
+        Configuration config = new Configuration();
+        // uncomment to run test with adaptive scheduler
+        // config.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM, config);
         flinkCluster.start();
         TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM);
     }
@@ -126,7 +132,6 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
      * additionally a failure on the reader that triggers recovery.
      */
     @Test
-    @Category(FailsWithAdaptiveScheduler.class) // FLINK-22464
     public void testOperatorEventLostWithReaderFailure() throws Exception {
         final int[] eventsToLose = new int[] {1, 3};
 
@@ -205,6 +210,22 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
         env.setParallelism(1);
         env.enableCheckpointing(50);
 
+        // This test depends on checkpoints persisting progress from the 
source before the
+        // artificial exception gets triggered. Otherwise, the job will run 
for a long time (or
+        // forever) because the exception will be thrown before any checkpoint 
successfully
+        // completes.
+        //
+        // Checkpoints are triggered once the checkpoint scheduler gets 
started + a random initial
+        // delay. For DefaultScheduler, this mechanism is fine, because DS 
starts the checkpoint
+        // coordinator, then requests the required slots and then deploys the 
tasks. These
+        // operations take enough time to have a checkpoint triggered by the 
time the task starts
+        // running. AdaptiveScheduler starts the CheckpointCoordinator right 
before deploying tasks
+        // (when slots are available already), hence tasks will start running 
almost immediately,
+        // and the checkpoint gets triggered too late (it won't be able to 
complete before the
+        // artificial failure from this test)
+        // Therefore, the TestingNumberSequenceSource waits for a checkpoint 
before emitting all
+        // required messages.
+
         final DataStream<Long> numbers =
                 env.fromSource(
                                 new TestingNumberSequenceSource(1L, 
numElements, 3),
@@ -257,7 +278,6 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
     private static final class AssignAfterCheckpointEnumerator<
                     SplitT extends IteratorSourceSplit<?, ?>>
             extends IteratorSourceEnumerator<SplitT> {
-
         private final Queue<Integer> pendingRequests = new ArrayDeque<>();
         private final SplitEnumeratorContext<?> context;
 
@@ -293,10 +313,12 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
         private static final long serialVersionUID = 1L;
 
         private final int numSplits;
+        private final long numAllowedMessageBeforeCheckpoint;
 
         public TestingNumberSequenceSource(long from, long to, int numSplits) {
             super(from, to);
             this.numSplits = numSplits;
+            this.numAllowedMessageBeforeCheckpoint = (to - from) / numSplits;
         }
 
         @Override
@@ -306,6 +328,40 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
                     splitNumberRange(getFrom(), getTo(), numSplits);
             return new AssignAfterCheckpointEnumerator<>(enumContext, splits);
         }
+
+        @Override
+        public SourceReader<Long, NumberSequenceSplit> createReader(
+                SourceReaderContext readerContext) {
+            return new CheckpointListeningIteratorSourceReader(
+                    readerContext, numAllowedMessageBeforeCheckpoint);
+        }
+    }
+
+    private static class CheckpointListeningIteratorSourceReader extends 
IteratorSourceReader {
+        private boolean checkpointed = false;
+        private long messagesProduced = 0;
+        private final long numAllowedMessageBeforeCheckpoint;
+
+        public CheckpointListeningIteratorSourceReader(
+                SourceReaderContext context, long 
waitForCheckpointAfterMessages) {
+            super(context);
+            this.numAllowedMessageBeforeCheckpoint = 
waitForCheckpointAfterMessages;
+        }
+
+        @Override
+        public InputStatus pollNext(ReaderOutput output) {
+            if (messagesProduced < numAllowedMessageBeforeCheckpoint || 
checkpointed) {
+                messagesProduced++;
+                return super.pollNext(output);
+            } else {
+                return InputStatus.NOTHING_AVAILABLE;
+            }
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+            checkpointed = true;
+        }
     }
 
     // ------------------------------------------------------------------------
@@ -421,11 +477,13 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
 
         private boolean localRpcCreated;
 
-        public MiniClusterWithRpcIntercepting(final int numSlots) {
+        public MiniClusterWithRpcIntercepting(
+                final int numSlots, final Configuration configuration) {
             super(
                     new MiniClusterConfiguration.Builder()
                             .setRpcServiceSharing(RpcServiceSharing.SHARED)
                             .setNumTaskManagers(1)
+                            .setConfiguration(configuration)
                             .setNumSlotsPerTaskManager(numSlots)
                             .build());
         }

Reply via email to