This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 6e7703738cd [FLINK-31133][tests] Prevent timeouts in 
PartiallyFinishedSourcesITCase
6e7703738cd is described below

commit 6e7703738cdefed17277ea86d2c9dc25393eceac
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Sat Feb 25 21:24:46 2023 +0000

    [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase
    
    - Only obtain execution exception if the job is in globally terminal state
    
    - [hotfix] Unsubscribe finished TestEventSources from test commands.  
Otherwise,
      any command with SINGLE_SUBTASK scope might be dispatched to a finished 
source.
      This will result in TestJobExecutor.waitForFailover timing out while 
waiting for the
      command to be executed and ACKed.
    
    - [hotfix] Mark TestEventSource.scheduledCommands volatile
    
    - [hotfix] Make sure to process all commands in TestEventSource
---
 .../operators/lifecycle/TestJobExecutor.java       | 25 +++++++++++-----------
 .../command/SharedTestCommandDispatcher.java       |  5 +++++
 .../lifecycle/command/TestCommandDispatcher.java   |  2 ++
 .../command/TestCommandDispatcherImpl.java         |  5 +++++
 .../operators/lifecycle/graph/TestEventSource.java | 23 ++++++++++++++++----
 5 files changed, 44 insertions(+), 16 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java
index 9082702c3d1..5df5fc4fdf4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java
@@ -164,22 +164,23 @@ public class TestJobExecutor {
     }
 
     private void handleFailoverTimeout(TimeoutException e) throws Exception {
+        JobStatus jobStatus = 
miniClusterResource.getClusterClient().getJobStatus(jobID).get();
         String message =
                 String.format(
                         "Unable to failover the job: %s; job status: %s",
-                        e.getMessage(),
-                        
miniClusterResource.getClusterClient().getJobStatus(jobID).get());
-        Optional<SerializedThrowable> throwable =
-                miniClusterResource
-                        .getClusterClient()
-                        .requestJobResult(jobID)
-                        .get()
-                        .getSerializedThrowable();
-        if (throwable.isPresent()) {
-            throw new RuntimeException(message, throwable.get());
-        } else {
-            throw new RuntimeException(message);
+                        e.getMessage(), jobStatus);
+        if (jobStatus.isGloballyTerminalState()) {
+            Optional<SerializedThrowable> throwable =
+                    miniClusterResource
+                            .getClusterClient()
+                            .requestJobResult(jobID)
+                            .get()
+                            .getSerializedThrowable();
+            if (throwable.isPresent()) {
+                throw new RuntimeException(message, throwable.get());
+            }
         }
+        throw new RuntimeException(message);
     }
 
     public TestJobExecutor sendBroadcastCommand(TestCommand command, 
TestCommandScope scope) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java
index 3ad52515808..d91c96f5cbc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java
@@ -45,4 +45,9 @@ class SharedTestCommandDispatcher implements 
TestCommandDispatcher {
     public void broadcast(TestCommand testCommand, TestCommandScope scope) {
         ref.get().broadcast(testCommand, scope);
     }
+
+    @Override
+    public void unsubscribe(String operatorID, CommandExecutor 
commandExecutor) {
+        ref.get().unsubscribe(operatorID, commandExecutor);
+    }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java
index cdf46495102..3af13663b48 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java
@@ -33,6 +33,8 @@ public interface TestCommandDispatcher extends Serializable {
 
     void broadcast(TestCommand command, TestCommandScope scope);
 
+    void unsubscribe(String operatorID, CommandExecutor commandExecutor);
+
     /** An executor of {@link TestCommand}s. */
     interface CommandExecutor {
         void execute(TestCommand testCommand);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java
index fc6f8f9386c..f815a38ae19 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java
@@ -52,6 +52,11 @@ class TestCommandDispatcherImpl implements 
TestCommandDispatcher {
                         .collect(Collectors.toList()));
     }
 
+    @Override
+    public void unsubscribe(String operatorID, CommandExecutor 
commandExecutor) {
+        subscribers.getOrDefault(operatorID, 
emptyList()).remove(commandExecutor);
+    }
+
     private void executeInternal(
             TestCommand command, TestCommandScope scope, List<CommandExecutor> 
executors) {
         checkState(!executors.isEmpty(), "no executors for command: " + 
command);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java
index 5d0fd80276a..6a778770558 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators.lifecycle.graph;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
 import 
org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
+import 
org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher.CommandExecutor;
 import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
 import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent;
 import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
@@ -28,6 +29,9 @@ import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -40,11 +44,13 @@ import static 
org.apache.flink.runtime.operators.lifecycle.command.TestCommand.F
  */
 public class TestEventSource extends RichSourceFunction<TestDataElement>
         implements ParallelSourceFunction<TestDataElement> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestEventSource.class);
     private final String operatorID;
     private final TestCommandDispatcher commandQueue;
-    private transient Queue<TestCommand> scheduledCommands;
+    private transient volatile Queue<TestCommand> scheduledCommands;
     private transient volatile boolean isRunning = true;
     private final TestEventQueue eventQueue;
+    private transient volatile CommandExecutor commandExecutor;
 
     public TestEventSource(
             String operatorID, TestEventQueue eventQueue, 
TestCommandDispatcher commandQueue) {
@@ -58,7 +64,8 @@ public class TestEventSource extends 
RichSourceFunction<TestDataElement>
         super.open(parameters);
         this.isRunning = true;
         this.scheduledCommands = new LinkedBlockingQueue<>();
-        this.commandQueue.subscribe(cmd -> scheduledCommands.add(cmd), 
operatorID);
+        this.commandExecutor = cmd -> scheduledCommands.add(cmd);
+        this.commandQueue.subscribe(commandExecutor, operatorID);
         this.eventQueue.add(
                 new OperatorStartedEvent(
                         operatorID,
@@ -69,12 +76,12 @@ public class TestEventSource extends 
RichSourceFunction<TestDataElement>
     @Override
     public void run(SourceContext<TestDataElement> ctx) {
         long lastSent = 0;
-        while (isRunning) {
+        while (isRunning || !scheduledCommands.isEmpty()) {
             // Don't finish the source if it has not sent at least one value.
             TestCommand cmd = lastSent == 0 ? null : scheduledCommands.poll();
             if (cmd == FINISH_SOURCES) {
                 ack(cmd);
-                isRunning = false;
+                stop();
             } else if (cmd == FAIL) {
                 ack(cmd);
                 throw new RuntimeException("requested to fail");
@@ -103,6 +110,14 @@ public class TestEventSource extends 
RichSourceFunction<TestDataElement>
 
     @Override
     public void cancel() {
+        stop();
+    }
+
+    private void stop() {
+        commandQueue.unsubscribe(operatorID, commandExecutor);
         isRunning = false;
+        if (!scheduledCommands.isEmpty()) {
+            LOG.info("Unsubscribed with remaining commands: {}", 
scheduledCommands);
+        }
     }
 }

Reply via email to