This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 434f59f53c8fe5cd47406472cd0ddf9051081f18
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Wed Jun 26 14:20:48 2019 +0200

    [hotfix][task,test] Do not override performDefaultAction in StreamTaskTest
---
 .../tasks/StreamTaskCancellationBarrierTest.java   |  1 +
 .../streaming/runtime/tasks/StreamTaskTest.java    | 52 ++++++++++++++++------
 2 files changed, 40 insertions(+), 13 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 21427e0..92cf60b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -185,6 +185,7 @@ public class StreamTaskCancellationBarrierTest {
 
                @Override
                protected void init() throws Exception {
+                       super.init();
                        synchronized (lock) {
                                while (running) {
                                        lock.wait();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 949a75f..26c6faf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -108,6 +108,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
@@ -145,6 +146,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -826,11 +828,8 @@ public class StreamTaskTest extends TestLogger {
                }
 
                @Override
-               protected void init() throws Exception {}
-
-               @Override
-               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
-                       context.allActionsCompleted();
+               protected void init() throws Exception {
+                       inputProcessor = new EmptyInputProcessor();
                }
 
                @Override
@@ -1019,7 +1018,6 @@ public class StreamTaskTest extends TestLogger {
        private static class MockStreamTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
 
                private final OperatorChain<String, 
AbstractStreamOperator<String>> overrideOperatorChain;
-               private volatile boolean inputFinished;
 
                MockStreamTask(Environment env, OperatorChain<String, 
AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler 
uncaughtExceptionHandler) {
                        super(env, null, uncaughtExceptionHandler);
@@ -1033,20 +1031,47 @@ public class StreamTaskTest extends TestLogger {
                        // here for test purposes.
                        super.operatorChain = this.overrideOperatorChain;
                        super.headOperator = 
super.operatorChain.getHeadOperator();
+                       super.inputProcessor = new EmptyInputProcessor(false);
+               }
+
+               void finishInput() {
+                       checkState(inputProcessor != null, "Tried to 
finishInput before MockStreamTask was started");
+                       ((EmptyInputProcessor) inputProcessor).finishInput();
+               }
+       }
+
+       private static class EmptyInputProcessor implements 
StreamInputProcessor {
+               private volatile boolean isFinished;
+
+               public EmptyInputProcessor() {
+                       this(true);
+               }
+
+               public EmptyInputProcessor(boolean startFinished) {
+                       isFinished = startFinished;
                }
 
                @Override
-               protected void performDefaultAction(DefaultActionContext 
context) {
-                       if (isCanceled() || inputFinished) {
-                               context.allActionsCompleted();
-                       }
+               public boolean processInput() throws Exception {
+                       return false;
                }
 
                @Override
-               protected void cleanup() throws Exception {}
+               public void close() throws IOException {
+               }
 
-               void finishInput() {
-                       this.inputFinished = true;
+               @Override
+               public boolean isFinished() {
+                       return isFinished;
+               }
+
+               @Override
+               public CompletableFuture<?> isAvailable() {
+                       return AVAILABLE;
+               }
+
+               public void finishInput() {
+                       isFinished = true;
                }
        }
 
@@ -1262,6 +1287,7 @@ public class StreamTaskTest extends TestLogger {
 
                @Override
                protected void init() throws Exception {
+                       super.init();
                        getProcessingTimeService().registerTimer(0, new 
ProcessingTimeCallback() {
                                @Override
                                public void onProcessingTime(long timestamp) 
throws Exception {

Reply via email to