Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149954576 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -58,99 +59,144 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.Executor; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TaskAsyncCallTest { - private static final int NUM_CALLS = 1000; - + private static int numCalls; + + /** Triggered at the beginning of {@link CheckpointsInOrderInvokable#invoke()}. */ private static OneShotLatch awaitLatch; + + /** + * Triggered when {@link CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, CheckpointOptions)} + * was called {@link #numCalls} times. + */ private static OneShotLatch triggerLatch; + private static final List<ClassLoader> classLoaders = new ArrayList<>(); + @Before public void createQueuesAndActors() { + numCalls = 1000; + awaitLatch = new OneShotLatch(); triggerLatch = new OneShotLatch(); + + classLoaders.clear(); } // ------------------------------------------------------------------------ // Tests // ------------------------------------------------------------------------ - + @Test - public void testCheckpointCallsInOrder() { - try { - Task task = createTask(); + public void testCheckpointCallsInOrder() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); - + awaitLatch.await(); - - for (int i = 1; i <= NUM_CALLS; i++) { + + for (int i = 1; i <= numCalls; i++) { task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); } - + triggerLatch.await(); - + assertFalse(task.isCanceledOrFailed()); ExecutionState currentState = task.getExecutionState(); - if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) { - fail("Task should be RUNNING or FINISHED, but is " + currentState); - } - - task.cancelExecution(); - task.getExecutingThread().join(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + assertThat(currentState, isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED)); } } @Test - public void testMixedAsyncCallsInOrder() { - try { - Task task = createTask(); + public void testMixedAsyncCallsInOrder() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); awaitLatch.await(); - for (int i = 1; i <= NUM_CALLS; i++) { + for (int i = 1; i <= numCalls; i++) { task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); task.notifyCheckpointComplete(i); } triggerLatch.await(); assertFalse(task.isCanceledOrFailed()); + ExecutionState currentState = task.getExecutionState(); - if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) { - fail("Task should be RUNNING or FINISHED, but is " + currentState); - } + assertThat(currentState, isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED)); + } + } - task.cancelExecution(); - task.getExecutingThread().join(); + @Test + public void testThrowExceptionIfStopInvokedWithNotStoppableTask() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { + task.startTaskThread(); + awaitLatch.await(); + + try { + task.stopExecution(); + fail("Expected exception not thrown"); + } catch (UnsupportedOperationException e) { + assertThat(e.getMessage(), containsString("Stopping not supported by task")); + } } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + } + + /** + * Asserts that {@link StatefulTask#triggerCheckpoint(CheckpointMetaData, CheckpointOptions)}, + * {@link StatefulTask#notifyCheckpointComplete(long)}, and {@link StoppableTask#stop()} are + * invoked by a thread whose context class loader is set to the user code class loader. + */ + @Test + public void testSetsUserCodeClassLoader() throws Exception { + numCalls = 1; + + Task task = createTask(ContextClassLoaderInterceptingInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { + task.startTaskThread(); + + awaitLatch.await(); + + task.triggerCheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()); + task.notifyCheckpointComplete(1); + task.stopExecution(); } + + // assert after task is canceled and executing thread is stopped to avoid race conditions + assertThat(classLoaders, hasSize(greaterThanOrEqualTo(3))); --- End diff -- I believe you are right. I introduced another latch to counter this.
---