This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c539333e748324f2196f7a266f0bbf1e6b797c52
Author: Aleksey Pak <alek...@ververica.com>
AuthorDate: Mon Jun 3 22:08:54 2019 +0200

    [hotfix][tests] Simplify SynchronousCheckpointITCase
---
 .../runtime/tasks/SynchronousCheckpointITCase.java | 136 +++++++--------------
 1 file changed, 42 insertions(+), 94 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 8791b94..054b047 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
@@ -65,17 +64,20 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
 
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -86,30 +88,11 @@ import static org.mockito.Mockito.when;
  */
 public class SynchronousCheckpointITCase {
 
-       private static OneShotLatch executionLatch;
-       private static OneShotLatch cancellationLatch;
-       private static OneShotLatch checkpointCompletionLatch;
-       private static OneShotLatch notifyLatch;
        private static OneShotLatch checkpointTriggered = new OneShotLatch();
 
-       private static MultiShotLatch checkpointLatch;
-
-       private static AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-       private static volatile CheckpointingStateHolder 
synchronousCheckpointPhase = new CheckpointingStateHolder();
-
-       @Before
-       public void initializeLatchesAndError() {
-               executionLatch = new OneShotLatch();
-               cancellationLatch = new OneShotLatch();
-               checkpointCompletionLatch = new OneShotLatch();
-               notifyLatch = new OneShotLatch();
-
-               checkpointLatch = new MultiShotLatch();
-
-               synchronousCheckpointPhase.setState(CheckpointingState.NONE);
-               error.set(null);
-       }
+       // A thread-safe queue to "log" and monitor events happening in the 
task's methods. Also, used by the test thread
+       // to synchronize actions with the task's threads.
+       private static LinkedBlockingQueue<Event> eventQueue = new 
LinkedBlockingQueue<>();
 
        @Rule
        public final Timeout timeoutPerTest = Timeout.seconds(10);
@@ -121,10 +104,10 @@ public class SynchronousCheckpointITCase {
                try (TaskCleaner ignored = new TaskCleaner(task)) {
                        task.startTaskThread();
 
-                       executionLatch.await();
+                       assertThat(eventQueue.take(), 
is(Event.TASK_IS_RUNNING));
+                       assertTrue(eventQueue.isEmpty());
 
                        assertEquals(ExecutionState.RUNNING, 
task.getExecutionState());
-                       assertEquals(CheckpointingState.NONE, 
synchronousCheckpointPhase.getState());
 
                        // Hack: we are triggering a checkpoint with 
advanceToEndOfEventTime = true, to be sure that
                        // triggerCheckpointBarrier has reached the sync 
checkpoint latch (by verifying in
@@ -137,33 +120,31 @@ public class SynchronousCheckpointITCase {
                                        156865867234L,
                                        new 
CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, 
CheckpointStorageLocationReference.getDefault()),
                                        true);
-                       checkpointLatch.await();
 
-                       assertNull(error.get());
-                       assertEquals(CheckpointingState.PERFORMING_CHECKPOINT, 
synchronousCheckpointPhase.getState());
+                       assertThat(eventQueue.take(), 
is(Event.PRE_TRIGGER_CHECKPOINT));
+                       assertTrue(eventQueue.isEmpty());
 
                        checkpointTriggered.await();
 
                        task.notifyCheckpointComplete(42);
 
-                       notifyLatch.await();
-                       assertNull(error.get());
-                       assertEquals(CheckpointingState.EXECUTED_CALLBACK, 
synchronousCheckpointPhase.getState());
+                       assertThat(eventQueue.take(), 
is(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE));
+                       assertThat(
+                               Arrays.asList(eventQueue.take(), 
eventQueue.take()),
+                               
containsInAnyOrder(Event.POST_NOTIFY_CHECKPOINT_COMPLETE, 
Event.POST_TRIGGER_CHECKPOINT));
+                       assertTrue(eventQueue.isEmpty());
 
-                       checkpointCompletionLatch.trigger();
-                       checkpointLatch.await();
-
-                       assertNull(error.get());
-                       assertEquals(CheckpointingState.FINISHED_CHECKPOINT, 
synchronousCheckpointPhase.getState());
                        assertEquals(ExecutionState.RUNNING, 
task.getExecutionState());
                }
        }
 
        /**
         * A {@link StreamTask} which makes sure that the different phases of a 
synchronous checkpoint
-        * are reflected in the {@link 
SynchronousCheckpointITCase#synchronousCheckpointPhase} field.
+        * are reflected in the {@link SynchronousCheckpointITCase#eventQueue}.
         */
        public static class SynchronousCheckpointTestingTask extends StreamTask 
{
+               // Flag to emit the first event only once.
+               private boolean isRunning;
 
                public SynchronousCheckpointTestingTask(Environment 
environment) {
                        super(environment);
@@ -171,42 +152,38 @@ public class SynchronousCheckpointITCase {
 
                @Override
                protected void performDefaultAction(ActionContext context) 
throws Exception {
-                       executionLatch.trigger();
-                       cancellationLatch.await();
-                       context.allActionsCompleted();
+                       if (!isRunning) {
+                               isRunning = true;
+                               eventQueue.put(Event.TASK_IS_RUNNING);
+                       }
+                       if (isCanceled()) {
+                               context.allActionsCompleted();
+                       } else {
+                               context.actionsUnavailable();
+                       }
                }
 
                @Override
                protected void cancelTask() {
-                       cancellationLatch.trigger();
                }
 
                @Override
                public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, boolean 
advanceToEndOfEventTime) throws Exception {
-                       
SynchronousCheckpointITCase.synchronousCheckpointPhase.setState(CheckpointingState.PERFORMING_CHECKPOINT);
-                       checkpointLatch.trigger();
-
-                       super.triggerCheckpoint(checkpointMetaData, 
checkpointOptions, advanceToEndOfEventTime);
-
-                       checkpointCompletionLatch.await();
-                       
SynchronousCheckpointITCase.synchronousCheckpointPhase.setState(CheckpointingState.FINISHED_CHECKPOINT);
-                       checkpointLatch.trigger();
-
-                       return true;
+                       eventQueue.put(Event.PRE_TRIGGER_CHECKPOINT);
+                       boolean result = 
super.triggerCheckpoint(checkpointMetaData, checkpointOptions, 
advanceToEndOfEventTime);
+                       eventQueue.put(Event.POST_TRIGGER_CHECKPOINT);
+                       return result;
                }
 
                @Override
                public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       
SynchronousCheckpointITCase.synchronousCheckpointPhase.setState(CheckpointingState.EXECUTING_CALLBACK);
+                       eventQueue.put(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE);
                        super.notifyCheckpointComplete(checkpointId);
-
-                       
SynchronousCheckpointITCase.synchronousCheckpointPhase.setState(CheckpointingState.EXECUTED_CALLBACK);
-                       notifyLatch.trigger();
+                       eventQueue.put(Event.POST_NOTIFY_CHECKPOINT_COMPLETE);
                }
 
                @Override
                protected void init() {
-
                }
 
                @Override
@@ -235,41 +212,12 @@ public class SynchronousCheckpointITCase {
        /**
         * The different state transitions during a synchronous checkpoint 
along with their expected previous state.
         */
-       private enum CheckpointingState {
-               NONE(null),
-               PERFORMING_CHECKPOINT(NONE),
-               EXECUTING_CALLBACK(PERFORMING_CHECKPOINT),
-               EXECUTED_CALLBACK(EXECUTING_CALLBACK),
-               FINISHED_CHECKPOINT(EXECUTED_CALLBACK);
-
-               private final CheckpointingState expectedPreviousState;
-
-               CheckpointingState(final CheckpointingState previousState) {
-                       this.expectedPreviousState = previousState;
-               }
-
-               void checkValidStateTransition(final CheckpointingState 
actualPreviousState) {
-                       if (this.expectedPreviousState != actualPreviousState) {
-                               error.set(new AssertionError());
-                       }
-               }
-       }
-
-       /**
-        * A container holding the current {@link CheckpointingState}.
-        */
-       private static final class CheckpointingStateHolder {
-
-               private volatile CheckpointingState checkpointingState = null;
-
-               void setState(CheckpointingState state) {
-                       state.checkValidStateTransition(checkpointingState);
-                       checkpointingState = state;
-               }
-
-               CheckpointingState getState() {
-                       return checkpointingState;
-               }
+       private enum Event {
+               TASK_IS_RUNNING,
+               PRE_TRIGGER_CHECKPOINT,
+               PRE_NOTIFY_CHECKPOINT_COMPLETE,
+               POST_NOTIFY_CHECKPOINT_COMPLETE,
+               POST_TRIGGER_CHECKPOINT,
        }
 
        // --------------------------           Boilerplate tools copied from 
the TaskAsyncCallTest             --------------------------

Reply via email to