This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1838e9ce586e3ca68cdb6cede9c08458fe6857cb
Author: Aleksey Pak <alek...@ververica.com>
AuthorDate: Mon Jun 24 13:48:22 2019 +0200

    [hotfix][tests] StreamTaskTestHarness: create StreamTask instance in the 
execution thread (task's thread)
    
    This would prepare some StreamTask tests to be compatible with a new 
invariant in the StreamTask that the mailbox loop should be running in the 
task's thread. The latter is decided by the thread that instantiates the task.
---
 .../runtime/tasks/StreamTaskTestHarness.java       | 53 +++++++++++-----------
 1 file changed, 26 insertions(+), 27 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index d6cd43c..c1f53c5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import 
org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
@@ -58,8 +57,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Test harness for testing a {@link StreamTask}.
@@ -91,8 +92,6 @@ public class StreamTaskTestHarness<OUT> {
 
        protected TestTaskStateManager taskStateManager;
 
-       private StreamTask<OUT, ?> task;
-
        private TypeSerializer<OUT> outputSerializer;
        private TypeSerializer<StreamElement> outputStreamRecordSerializer;
 
@@ -145,7 +144,7 @@ public class StreamTaskTestHarness<OUT> {
        }
 
        public ProcessingTimeService getProcessingTimeService() {
-               return task.getProcessingTimeService();
+               return taskThread.task.getProcessingTimeService();
        }
 
        /**
@@ -231,15 +230,20 @@ public class StreamTaskTestHarness<OUT> {
         *
         */
        public Thread invoke(StreamMockEnvironment mockEnv) throws Exception {
+               checkState(this.mockEnv == null);
+               checkState(this.taskThread == null);
                this.mockEnv = checkNotNull(mockEnv);
 
                initializeInputs();
                initializeOutput();
 
-               this.task = taskFactory.apply(mockEnv);
-
-               taskThread = new TaskThread(task);
+               taskThread = new TaskThread(() -> taskFactory.apply(mockEnv));
                taskThread.start();
+               // Wait until the task is set
+               while (taskThread.task == null) {
+                       Thread.sleep(10L);
+               }
+
                return taskThread;
        }
 
@@ -287,31 +291,24 @@ public class StreamTaskTestHarness<OUT> {
         * @throws Exception
         */
        public void waitForTaskRunning(long timeout) throws Exception {
-               if (taskThread == null) {
+               if (taskThread == null || taskThread.task == null) {
                        throw new IllegalStateException("Task thread was not 
started.");
                }
-               else {
-                       if (taskThread.task instanceof StreamTask) {
-                               StreamTask<?, ?> streamTask = (StreamTask<?, 
?>) taskThread.task;
-                               while (!streamTask.isRunning()) {
-                                       Thread.sleep(10);
-                                       if (!taskThread.isAlive()) {
-                                               if (taskThread.getError() != 
null) {
-                                                       throw new 
Exception("Task Thread failed due to an error.", taskThread.getError());
-                                               } else {
-                                                       throw new 
Exception("Task Thread unexpectedly shut down.");
-                                               }
-                                       }
+               StreamTask<?, ?> streamTask = taskThread.task;
+               while (!streamTask.isRunning()) {
+                       Thread.sleep(10);
+                       if (!taskThread.isAlive()) {
+                               if (taskThread.getError() != null) {
+                                       throw new Exception("Task Thread failed 
due to an error.", taskThread.getError());
+                               } else {
+                                       throw new Exception("Task Thread 
unexpectedly shut down.");
                                }
                        }
-                       else {
-                               throw new IllegalStateException("Not a 
StreamTask");
-                       }
                }
        }
 
        public StreamTask<OUT, ?> getTask() {
-               return task;
+               return taskThread.task;
        }
 
        /**
@@ -442,17 +439,19 @@ public class StreamTaskTestHarness<OUT> {
 
        private class TaskThread extends Thread {
 
-               private final AbstractInvokable task;
+               private final Supplier<? extends StreamTask<OUT, ?>> 
taskFactory;
+               private volatile StreamTask<OUT, ?> task;
 
                private volatile Throwable error;
 
-               TaskThread(AbstractInvokable task) {
+               TaskThread(Supplier<? extends StreamTask<OUT, ?>> taskFactory) {
                        super("Task Thread");
-                       this.task = task;
+                       this.taskFactory = taskFactory;
                }
 
                @Override
                public void run() {
+                       task = taskFactory.get();
                        try {
                                task.invoke();
                                shutdownIOManager();

Reply via email to