This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8119411addd [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished 8119411addd is described below commit 8119411addd9c82c15bab8480e7b35b8e6394d43 Author: Shammon FY <zjur...@gmail.com> AuthorDate: Mon Jun 19 10:17:19 2023 +0800 [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished Close apache/flink#22819 --- .../client/program/rest/RestClusterClientTest.java | 37 ++++++++++++++++++++++ .../operators/collect/CollectResultFetcher.java | 7 ++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 2ff6dea2a98..740eb06a57b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -122,6 +123,7 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; @@ -1136,6 +1138,32 @@ class RestClusterClientTest { } } + @Test + void testSendCoordinationRequestException() throws Exception { + final TestClientCoordinationHandler handler = + new TestClientCoordinationHandler(new FlinkJobNotFoundException(jobId)); + try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(handler)) { + try (RestClusterClient<?> restClusterClient = + createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { + String payload = "testing payload"; + TestCoordinationRequest<String> request = new TestCoordinationRequest<>(payload); + + assertThatThrownBy( + () -> + restClusterClient + .sendCoordinationRequest( + jobId, new OperatorID(), request) + .get()) + .matches( + e -> + ExceptionUtils.findThrowableWithMessage( + e, + FlinkJobNotFoundException.class.getName()) + .isPresent()); + } + } + } + /** * The SUSPENDED job status should never be returned by the client thus client retries until it * either receives a different job status or the cluster is not reachable. @@ -1166,9 +1194,15 @@ class RestClusterClientTest { ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> { + @Nullable private final FlinkJobNotFoundException exception; private TestClientCoordinationHandler() { + this(null); + } + + private TestClientCoordinationHandler(@Nullable FlinkJobNotFoundException exception) { super(ClientCoordinationHeaders.getInstance()); + this.exception = exception; } @Override @@ -1178,6 +1212,9 @@ class RestClusterClientTest { @Nonnull DispatcherGateway gateway) throws RestHandlerException { try { + if (exception != null) { + throw exception; + } TestCoordinationRequest req = (TestCoordinationRequest) request.getRequestBody() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java index 519b7d603f1..a7916502214 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java @@ -128,12 +128,13 @@ public class CollectResultFetcher<T> { try { response = sendRequest(buffer.getVersion(), requestOffset); } catch (Exception e) { - if (ExceptionUtils.findThrowable( - e, UnavailableDispatcherOperationException.class) + if (ExceptionUtils.findThrowableWithMessage( + e, UnavailableDispatcherOperationException.class.getName()) .isPresent()) { LOG.debug( "The job execution has not started yet; cannot fetch results.", e); - } else if (ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class) + } else if (ExceptionUtils.findThrowableWithMessage( + e, FlinkJobNotFoundException.class.getName()) .isPresent()) { LOG.debug( "The job cannot be found. It is very likely that the job is not in a RUNNING state.",