This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new 6e7703738cd [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase 6e7703738cd is described below commit 6e7703738cdefed17277ea86d2c9dc25393eceac Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Sat Feb 25 21:24:46 2023 +0000 [FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase - Only obtain execution exception if the job is in globally terminal state - [hotfix] Unsubscribe finished TestEventSources from test commands. Otherwise, any command with SINGLE_SUBTASK scope might be dispatched to a finished source. This will result in TestJobExecutor.waitForFailover timing out while waiting for the command to be executed and ACKed. - [hotfix] Mark TestEventSource.scheduledCommands volatile - [hotfix] Make sure to process all commands in TestEventSource --- .../operators/lifecycle/TestJobExecutor.java | 25 +++++++++++----------- .../command/SharedTestCommandDispatcher.java | 5 +++++ .../lifecycle/command/TestCommandDispatcher.java | 2 ++ .../command/TestCommandDispatcherImpl.java | 5 +++++ .../operators/lifecycle/graph/TestEventSource.java | 23 ++++++++++++++++---- 5 files changed, 44 insertions(+), 16 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java index 9082702c3d1..5df5fc4fdf4 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java @@ -164,22 +164,23 @@ public class TestJobExecutor { } private void handleFailoverTimeout(TimeoutException e) throws Exception { + JobStatus jobStatus = miniClusterResource.getClusterClient().getJobStatus(jobID).get(); String message = String.format( "Unable to failover the job: %s; job status: %s", - e.getMessage(), - miniClusterResource.getClusterClient().getJobStatus(jobID).get()); - Optional<SerializedThrowable> throwable = - miniClusterResource - .getClusterClient() - .requestJobResult(jobID) - .get() - .getSerializedThrowable(); - if (throwable.isPresent()) { - throw new RuntimeException(message, throwable.get()); - } else { - throw new RuntimeException(message); + e.getMessage(), jobStatus); + if (jobStatus.isGloballyTerminalState()) { + Optional<SerializedThrowable> throwable = + miniClusterResource + .getClusterClient() + .requestJobResult(jobID) + .get() + .getSerializedThrowable(); + if (throwable.isPresent()) { + throw new RuntimeException(message, throwable.get()); + } } + throw new RuntimeException(message); } public TestJobExecutor sendBroadcastCommand(TestCommand command, TestCommandScope scope) { diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java index 3ad52515808..d91c96f5cbc 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java @@ -45,4 +45,9 @@ class SharedTestCommandDispatcher implements TestCommandDispatcher { public void broadcast(TestCommand testCommand, TestCommandScope scope) { ref.get().broadcast(testCommand, scope); } + + @Override + public void unsubscribe(String operatorID, CommandExecutor commandExecutor) { + ref.get().unsubscribe(operatorID, commandExecutor); + } } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java index cdf46495102..3af13663b48 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java @@ -33,6 +33,8 @@ public interface TestCommandDispatcher extends Serializable { void broadcast(TestCommand command, TestCommandScope scope); + void unsubscribe(String operatorID, CommandExecutor commandExecutor); + /** An executor of {@link TestCommand}s. */ interface CommandExecutor { void execute(TestCommand testCommand); diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java index fc6f8f9386c..f815a38ae19 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java @@ -52,6 +52,11 @@ class TestCommandDispatcherImpl implements TestCommandDispatcher { .collect(Collectors.toList())); } + @Override + public void unsubscribe(String operatorID, CommandExecutor commandExecutor) { + subscribers.getOrDefault(operatorID, emptyList()).remove(commandExecutor); + } + private void executeInternal( TestCommand command, TestCommandScope scope, List<CommandExecutor> executors) { checkState(!executors.isEmpty(), "no executors for command: " + command); diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java index 5d0fd80276a..6a778770558 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators.lifecycle.graph; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.lifecycle.command.TestCommand; import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher; +import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher.CommandExecutor; import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent; import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent; import org.apache.flink.runtime.operators.lifecycle.event.TestEvent; @@ -28,6 +29,9 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; @@ -40,11 +44,13 @@ import static org.apache.flink.runtime.operators.lifecycle.command.TestCommand.F */ public class TestEventSource extends RichSourceFunction<TestDataElement> implements ParallelSourceFunction<TestDataElement> { + private static final Logger LOG = LoggerFactory.getLogger(TestEventSource.class); private final String operatorID; private final TestCommandDispatcher commandQueue; - private transient Queue<TestCommand> scheduledCommands; + private transient volatile Queue<TestCommand> scheduledCommands; private transient volatile boolean isRunning = true; private final TestEventQueue eventQueue; + private transient volatile CommandExecutor commandExecutor; public TestEventSource( String operatorID, TestEventQueue eventQueue, TestCommandDispatcher commandQueue) { @@ -58,7 +64,8 @@ public class TestEventSource extends RichSourceFunction<TestDataElement> super.open(parameters); this.isRunning = true; this.scheduledCommands = new LinkedBlockingQueue<>(); - this.commandQueue.subscribe(cmd -> scheduledCommands.add(cmd), operatorID); + this.commandExecutor = cmd -> scheduledCommands.add(cmd); + this.commandQueue.subscribe(commandExecutor, operatorID); this.eventQueue.add( new OperatorStartedEvent( operatorID, @@ -69,12 +76,12 @@ public class TestEventSource extends RichSourceFunction<TestDataElement> @Override public void run(SourceContext<TestDataElement> ctx) { long lastSent = 0; - while (isRunning) { + while (isRunning || !scheduledCommands.isEmpty()) { // Don't finish the source if it has not sent at least one value. TestCommand cmd = lastSent == 0 ? null : scheduledCommands.poll(); if (cmd == FINISH_SOURCES) { ack(cmd); - isRunning = false; + stop(); } else if (cmd == FAIL) { ack(cmd); throw new RuntimeException("requested to fail"); @@ -103,6 +110,14 @@ public class TestEventSource extends RichSourceFunction<TestDataElement> @Override public void cancel() { + stop(); + } + + private void stop() { + commandQueue.unsubscribe(operatorID, commandExecutor); isRunning = false; + if (!scheduledCommands.isEmpty()) { + LOG.info("Unsubscribed with remaining commands: {}", scheduledCommands); + } } }