This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c539333e748324f2196f7a266f0bbf1e6b797c52 Author: Aleksey Pak <alek...@ververica.com> AuthorDate: Mon Jun 3 22:08:54 2019 +0200 [hotfix][tests] Simplify SynchronousCheckpointITCase --- .../runtime/tasks/SynchronousCheckpointITCase.java | 136 +++++++-------------- 1 file changed, 42 insertions(+), 94 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java index 8791b94..054b047 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.PermanentBlobCache; @@ -65,17 +64,20 @@ import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.SerializedValue; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import java.util.Arrays; import java.util.Collections; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.LinkedBlockingQueue; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -86,30 +88,11 @@ import static org.mockito.Mockito.when; */ public class SynchronousCheckpointITCase { - private static OneShotLatch executionLatch; - private static OneShotLatch cancellationLatch; - private static OneShotLatch checkpointCompletionLatch; - private static OneShotLatch notifyLatch; private static OneShotLatch checkpointTriggered = new OneShotLatch(); - private static MultiShotLatch checkpointLatch; - - private static AtomicReference<Throwable> error = new AtomicReference<>(); - - private static volatile CheckpointingStateHolder synchronousCheckpointPhase = new CheckpointingStateHolder(); - - @Before - public void initializeLatchesAndError() { - executionLatch = new OneShotLatch(); - cancellationLatch = new OneShotLatch(); - checkpointCompletionLatch = new OneShotLatch(); - notifyLatch = new OneShotLatch(); - - checkpointLatch = new MultiShotLatch(); - - synchronousCheckpointPhase.setState(CheckpointingState.NONE); - error.set(null); - } + // A thread-safe queue to "log" and monitor events happening in the task's methods. Also, used by the test thread + // to synchronize actions with the task's threads. + private static LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>(); @Rule public final Timeout timeoutPerTest = Timeout.seconds(10); @@ -121,10 +104,10 @@ public class SynchronousCheckpointITCase { try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); - executionLatch.await(); + assertThat(eventQueue.take(), is(Event.TASK_IS_RUNNING)); + assertTrue(eventQueue.isEmpty()); assertEquals(ExecutionState.RUNNING, task.getExecutionState()); - assertEquals(CheckpointingState.NONE, synchronousCheckpointPhase.getState()); // Hack: we are triggering a checkpoint with advanceToEndOfEventTime = true, to be sure that // triggerCheckpointBarrier has reached the sync checkpoint latch (by verifying in @@ -137,33 +120,31 @@ public class SynchronousCheckpointITCase { 156865867234L, new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()), true); - checkpointLatch.await(); - assertNull(error.get()); - assertEquals(CheckpointingState.PERFORMING_CHECKPOINT, synchronousCheckpointPhase.getState()); + assertThat(eventQueue.take(), is(Event.PRE_TRIGGER_CHECKPOINT)); + assertTrue(eventQueue.isEmpty()); checkpointTriggered.await(); task.notifyCheckpointComplete(42); - notifyLatch.await(); - assertNull(error.get()); - assertEquals(CheckpointingState.EXECUTED_CALLBACK, synchronousCheckpointPhase.getState()); + assertThat(eventQueue.take(), is(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE)); + assertThat( + Arrays.asList(eventQueue.take(), eventQueue.take()), + containsInAnyOrder(Event.POST_NOTIFY_CHECKPOINT_COMPLETE, Event.POST_TRIGGER_CHECKPOINT)); + assertTrue(eventQueue.isEmpty()); - checkpointCompletionLatch.trigger(); - checkpointLatch.await(); - - assertNull(error.get()); - assertEquals(CheckpointingState.FINISHED_CHECKPOINT, synchronousCheckpointPhase.getState()); assertEquals(ExecutionState.RUNNING, task.getExecutionState()); } } /** * A {@link StreamTask} which makes sure that the different phases of a synchronous checkpoint - * are reflected in the {@link SynchronousCheckpointITCase#synchronousCheckpointPhase} field. + * are reflected in the {@link SynchronousCheckpointITCase#eventQueue}. */ public static class SynchronousCheckpointTestingTask extends StreamTask { + // Flag to emit the first event only once. + private boolean isRunning; public SynchronousCheckpointTestingTask(Environment environment) { super(environment); @@ -171,42 +152,38 @@ public class SynchronousCheckpointITCase { @Override protected void performDefaultAction(ActionContext context) throws Exception { - executionLatch.trigger(); - cancellationLatch.await(); - context.allActionsCompleted(); + if (!isRunning) { + isRunning = true; + eventQueue.put(Event.TASK_IS_RUNNING); + } + if (isCanceled()) { + context.allActionsCompleted(); + } else { + context.actionsUnavailable(); + } } @Override protected void cancelTask() { - cancellationLatch.trigger(); } @Override public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception { - SynchronousCheckpointITCase.synchronousCheckpointPhase.setState(CheckpointingState.PERFORMING_CHECKPOINT); - checkpointLatch.trigger(); - - super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); - - checkpointCompletionLatch.await(); - SynchronousCheckpointITCase.synchronousCheckpointPhase.setState(CheckpointingState.FINISHED_CHECKPOINT); - checkpointLatch.trigger(); - - return true; + eventQueue.put(Event.PRE_TRIGGER_CHECKPOINT); + boolean result = super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); + eventQueue.put(Event.POST_TRIGGER_CHECKPOINT); + return result; } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - SynchronousCheckpointITCase.synchronousCheckpointPhase.setState(CheckpointingState.EXECUTING_CALLBACK); + eventQueue.put(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE); super.notifyCheckpointComplete(checkpointId); - - SynchronousCheckpointITCase.synchronousCheckpointPhase.setState(CheckpointingState.EXECUTED_CALLBACK); - notifyLatch.trigger(); + eventQueue.put(Event.POST_NOTIFY_CHECKPOINT_COMPLETE); } @Override protected void init() { - } @Override @@ -235,41 +212,12 @@ public class SynchronousCheckpointITCase { /** * The different state transitions during a synchronous checkpoint along with their expected previous state. */ - private enum CheckpointingState { - NONE(null), - PERFORMING_CHECKPOINT(NONE), - EXECUTING_CALLBACK(PERFORMING_CHECKPOINT), - EXECUTED_CALLBACK(EXECUTING_CALLBACK), - FINISHED_CHECKPOINT(EXECUTED_CALLBACK); - - private final CheckpointingState expectedPreviousState; - - CheckpointingState(final CheckpointingState previousState) { - this.expectedPreviousState = previousState; - } - - void checkValidStateTransition(final CheckpointingState actualPreviousState) { - if (this.expectedPreviousState != actualPreviousState) { - error.set(new AssertionError()); - } - } - } - - /** - * A container holding the current {@link CheckpointingState}. - */ - private static final class CheckpointingStateHolder { - - private volatile CheckpointingState checkpointingState = null; - - void setState(CheckpointingState state) { - state.checkValidStateTransition(checkpointingState); - checkpointingState = state; - } - - CheckpointingState getState() { - return checkpointingState; - } + private enum Event { + TASK_IS_RUNNING, + PRE_TRIGGER_CHECKPOINT, + PRE_NOTIFY_CHECKPOINT_COMPLETE, + POST_NOTIFY_CHECKPOINT_COMPLETE, + POST_TRIGGER_CHECKPOINT, } // -------------------------- Boilerplate tools copied from the TaskAsyncCallTest --------------------------