This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new d01de29  [FLINK-21175][tests] Fix unpredictable Thread.getState in 
StreamTaskTestHarness due to concurrent class loading
d01de29 is described below

commit d01de297171c929a9686221a3b813567b579858d
Author: Kezhu Wang <kez...@gmail.com>
AuthorDate: Thu Nov 19 22:07:39 2020 +0800

    [FLINK-21175][tests] Fix unpredictable Thread.getState in 
StreamTaskTestHarness due to concurrent class loading
---
 .../runtime/tasks/StreamTaskTestHarness.java       | 44 +++++++++++++++-------
 1 file changed, 30 insertions(+), 14 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index c91045d..1bd931a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -48,12 +48,14 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.MailboxExecutor;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
@@ -67,7 +69,11 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -403,11 +409,7 @@ public class StreamTaskTestHarness<OUT> {
     /** This only returns after all input queues are empty. */
     public void waitForInputProcessing() throws Exception {
 
-        while (true) {
-            Throwable error = taskThread.getError();
-            if (error != null) {
-                throw new Exception("Exception in the task thread", error);
-            }
+        while (taskThread.isAlive()) {
 
             boolean allEmpty = true;
             for (int i = 0; i < numInputGates; i++) {
@@ -421,15 +423,25 @@ public class StreamTaskTestHarness<OUT> {
             }
         }
 
-        // then wait for the Task Thread to be in a blocked state
-        // Check whether the state is blocked, this should be the case if it 
cannot
-        // notifyNonEmpty more input, i.e. all currently available input has 
been processed.
-        while (true) {
-            Thread.State state = taskThread.getState();
-            if (state == Thread.State.BLOCKED
-                    || state == Thread.State.TERMINATED
-                    || state == Thread.State.WAITING
-                    || state == Thread.State.TIMED_WAITING) {
+        // Wait for all currently available input has been processed.
+        final AtomicBoolean allInputProcessed = new AtomicBoolean();
+        final MailboxProcessor mailboxProcessor = 
taskThread.task.mailboxProcessor;
+        final MailboxExecutor mailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
+        while (taskThread.isAlive()) {
+            try {
+                final CountDownLatch latch = new CountDownLatch(1);
+                mailboxExecutor.execute(
+                        () -> {
+                            
allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable());
+                            latch.countDown();
+                        },
+                        "query-whether-processInput-has-suspend-itself");
+                // Mail could be dropped due to task exception, so we do 
timed-await here.
+                latch.await(1, TimeUnit.SECONDS);
+            } catch (RejectedExecutionException ex) {
+                // Loop until task thread exit for possible task exception.
+            }
+            if (allInputProcessed.get()) {
                 break;
             }
 
@@ -437,6 +449,10 @@ public class StreamTaskTestHarness<OUT> {
                 Thread.sleep(1);
             } catch (InterruptedException ignored) {
             }
+            Throwable error = taskThread.getError();
+            if (error != null) {
+                throw new Exception("Exception in the task thread", error);
+            }
         }
     }
 

Reply via email to