This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new d01de29 [FLINK-21175][tests] Fix unpredictable Thread.getState in StreamTaskTestHarness due to concurrent class loading d01de29 is described below commit d01de297171c929a9686221a3b813567b579858d Author: Kezhu Wang <kez...@gmail.com> AuthorDate: Thu Nov 19 22:07:39 2020 +0800 [FLINK-21175][tests] Fix unpredictable Thread.getState in StreamTaskTestHarness due to concurrent class loading --- .../runtime/tasks/StreamTaskTestHarness.java | 44 +++++++++++++++------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index c91045d..1bd931a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -48,12 +48,14 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.MailboxExecutor; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.FunctionWithException; @@ -67,7 +69,11 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -403,11 +409,7 @@ public class StreamTaskTestHarness<OUT> { /** This only returns after all input queues are empty. */ public void waitForInputProcessing() throws Exception { - while (true) { - Throwable error = taskThread.getError(); - if (error != null) { - throw new Exception("Exception in the task thread", error); - } + while (taskThread.isAlive()) { boolean allEmpty = true; for (int i = 0; i < numInputGates; i++) { @@ -421,15 +423,25 @@ public class StreamTaskTestHarness<OUT> { } } - // then wait for the Task Thread to be in a blocked state - // Check whether the state is blocked, this should be the case if it cannot - // notifyNonEmpty more input, i.e. all currently available input has been processed. - while (true) { - Thread.State state = taskThread.getState(); - if (state == Thread.State.BLOCKED - || state == Thread.State.TERMINATED - || state == Thread.State.WAITING - || state == Thread.State.TIMED_WAITING) { + // Wait for all currently available input has been processed. + final AtomicBoolean allInputProcessed = new AtomicBoolean(); + final MailboxProcessor mailboxProcessor = taskThread.task.mailboxProcessor; + final MailboxExecutor mailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); + while (taskThread.isAlive()) { + try { + final CountDownLatch latch = new CountDownLatch(1); + mailboxExecutor.execute( + () -> { + allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable()); + latch.countDown(); + }, + "query-whether-processInput-has-suspend-itself"); + // Mail could be dropped due to task exception, so we do timed-await here. + latch.await(1, TimeUnit.SECONDS); + } catch (RejectedExecutionException ex) { + // Loop until task thread exit for possible task exception. + } + if (allInputProcessed.get()) { break; } @@ -437,6 +449,10 @@ public class StreamTaskTestHarness<OUT> { Thread.sleep(1); } catch (InterruptedException ignored) { } + Throwable error = taskThread.getError(); + if (error != null) { + throw new Exception("Exception in the task thread", error); + } } }