This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8119411addd [FLINK-32370][streaming] Fix warn log in result fetcher 
when job is finished
8119411addd is described below

commit 8119411addd9c82c15bab8480e7b35b8e6394d43
Author: Shammon FY <zjur...@gmail.com>
AuthorDate: Mon Jun 19 10:17:19 2023 +0800

    [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished
    
    Close apache/flink#22819
---
 .../client/program/rest/RestClusterClientTest.java | 37 ++++++++++++++++++++++
 .../operators/collect/CollectResultFetcher.java    |  7 ++--
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 2ff6dea2a98..740eb06a57b 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -122,6 +123,7 @@ import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -1136,6 +1138,32 @@ class RestClusterClientTest {
         }
     }
 
+    @Test
+    void testSendCoordinationRequestException() throws Exception {
+        final TestClientCoordinationHandler handler =
+                new TestClientCoordinationHandler(new 
FlinkJobNotFoundException(jobId));
+        try (TestRestServerEndpoint restServerEndpoint = 
createRestServerEndpoint(handler)) {
+            try (RestClusterClient<?> restClusterClient =
+                    
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
+                String payload = "testing payload";
+                TestCoordinationRequest<String> request = new 
TestCoordinationRequest<>(payload);
+
+                assertThatThrownBy(
+                                () ->
+                                        restClusterClient
+                                                .sendCoordinationRequest(
+                                                        jobId, new 
OperatorID(), request)
+                                                .get())
+                        .matches(
+                                e ->
+                                        
ExceptionUtils.findThrowableWithMessage(
+                                                        e,
+                                                        
FlinkJobNotFoundException.class.getName())
+                                                .isPresent());
+            }
+        }
+    }
+
     /**
      * The SUSPENDED job status should never be returned by the client thus 
client retries until it
      * either receives a different job status or the cluster is not reachable.
@@ -1166,9 +1194,15 @@ class RestClusterClientTest {
                     ClientCoordinationRequestBody,
                     ClientCoordinationResponseBody,
                     ClientCoordinationMessageParameters> {
+        @Nullable private final FlinkJobNotFoundException exception;
 
         private TestClientCoordinationHandler() {
+            this(null);
+        }
+
+        private TestClientCoordinationHandler(@Nullable 
FlinkJobNotFoundException exception) {
             super(ClientCoordinationHeaders.getInstance());
+            this.exception = exception;
         }
 
         @Override
@@ -1178,6 +1212,9 @@ class RestClusterClientTest {
                 @Nonnull DispatcherGateway gateway)
                 throws RestHandlerException {
             try {
+                if (exception != null) {
+                    throw exception;
+                }
                 TestCoordinationRequest req =
                         (TestCoordinationRequest)
                                 request.getRequestBody()
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
index 519b7d603f1..a7916502214 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
@@ -128,12 +128,13 @@ public class CollectResultFetcher<T> {
                 try {
                     response = sendRequest(buffer.getVersion(), requestOffset);
                 } catch (Exception e) {
-                    if (ExceptionUtils.findThrowable(
-                                    e, 
UnavailableDispatcherOperationException.class)
+                    if (ExceptionUtils.findThrowableWithMessage(
+                                    e, 
UnavailableDispatcherOperationException.class.getName())
                             .isPresent()) {
                         LOG.debug(
                                 "The job execution has not started yet; cannot 
fetch results.", e);
-                    } else if (ExceptionUtils.findThrowable(e, 
FlinkJobNotFoundException.class)
+                    } else if (ExceptionUtils.findThrowableWithMessage(
+                                    e, 
FlinkJobNotFoundException.class.getName())
                             .isPresent()) {
                         LOG.debug(
                                 "The job cannot be found. It is very likely 
that the job is not in a RUNNING state.",

Reply via email to