This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1838e9ce586e3ca68cdb6cede9c08458fe6857cb Author: Aleksey Pak <alek...@ververica.com> AuthorDate: Mon Jun 24 13:48:22 2019 +0200 [hotfix][tests] StreamTaskTestHarness: create StreamTask instance in the execution thread (task's thread) This would prepare some StreamTask tests to be compatible with a new invariant in the StreamTask that the mailbox loop should be running in the task's thread. The latter is decided by the thread that instantiates the task. --- .../runtime/tasks/StreamTaskTestHarness.java | 53 +++++++++++----------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index d6cd43c..c1f53c5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.LocalRecoveryConfig; @@ -58,8 +57,10 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Function; +import java.util.function.Supplier; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * Test harness for testing a {@link StreamTask}. @@ -91,8 +92,6 @@ public class StreamTaskTestHarness<OUT> { protected TestTaskStateManager taskStateManager; - private StreamTask<OUT, ?> task; - private TypeSerializer<OUT> outputSerializer; private TypeSerializer<StreamElement> outputStreamRecordSerializer; @@ -145,7 +144,7 @@ public class StreamTaskTestHarness<OUT> { } public ProcessingTimeService getProcessingTimeService() { - return task.getProcessingTimeService(); + return taskThread.task.getProcessingTimeService(); } /** @@ -231,15 +230,20 @@ public class StreamTaskTestHarness<OUT> { * */ public Thread invoke(StreamMockEnvironment mockEnv) throws Exception { + checkState(this.mockEnv == null); + checkState(this.taskThread == null); this.mockEnv = checkNotNull(mockEnv); initializeInputs(); initializeOutput(); - this.task = taskFactory.apply(mockEnv); - - taskThread = new TaskThread(task); + taskThread = new TaskThread(() -> taskFactory.apply(mockEnv)); taskThread.start(); + // Wait until the task is set + while (taskThread.task == null) { + Thread.sleep(10L); + } + return taskThread; } @@ -287,31 +291,24 @@ public class StreamTaskTestHarness<OUT> { * @throws Exception */ public void waitForTaskRunning(long timeout) throws Exception { - if (taskThread == null) { + if (taskThread == null || taskThread.task == null) { throw new IllegalStateException("Task thread was not started."); } - else { - if (taskThread.task instanceof StreamTask) { - StreamTask<?, ?> streamTask = (StreamTask<?, ?>) taskThread.task; - while (!streamTask.isRunning()) { - Thread.sleep(10); - if (!taskThread.isAlive()) { - if (taskThread.getError() != null) { - throw new Exception("Task Thread failed due to an error.", taskThread.getError()); - } else { - throw new Exception("Task Thread unexpectedly shut down."); - } - } + StreamTask<?, ?> streamTask = taskThread.task; + while (!streamTask.isRunning()) { + Thread.sleep(10); + if (!taskThread.isAlive()) { + if (taskThread.getError() != null) { + throw new Exception("Task Thread failed due to an error.", taskThread.getError()); + } else { + throw new Exception("Task Thread unexpectedly shut down."); } } - else { - throw new IllegalStateException("Not a StreamTask"); - } } } public StreamTask<OUT, ?> getTask() { - return task; + return taskThread.task; } /** @@ -442,17 +439,19 @@ public class StreamTaskTestHarness<OUT> { private class TaskThread extends Thread { - private final AbstractInvokable task; + private final Supplier<? extends StreamTask<OUT, ?>> taskFactory; + private volatile StreamTask<OUT, ?> task; private volatile Throwable error; - TaskThread(AbstractInvokable task) { + TaskThread(Supplier<? extends StreamTask<OUT, ?>> taskFactory) { super("Task Thread"); - this.task = task; + this.taskFactory = taskFactory; } @Override public void run() { + task = taskFactory.get(); try { task.invoke(); shutdownIOManager();