zentol closed pull request #6731: [FLINK-10312] Propagate exception from server 
to client in REST API
URL: https://github.com/apache/flink/pull/6731
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 935a07faf89..9eb8cc72c8f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -376,7 +376,7 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, 
ClassLoader classLoader)
                                (JobSubmitResponseBody jobSubmitResponseBody) 
-> new JobSubmissionResult(jobGraph.getJobID()))
                        .exceptionally(
                                (Throwable throwable) -> {
-                                       throw new CompletionException(new 
JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", 
throwable));
+                                       throw new CompletionException(new 
JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", 
ExceptionUtils.stripCompletionException(throwable)));
                                });
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index d4a65dec8f6..5a19a3f628d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -198,7 +198,10 @@
                                                if (throwable instanceof 
CancellationException) {
                                                        
resultFuture.completeExceptionally(new RetryException("Operation future was 
cancelled.", throwable));
                                                } else {
-                                                       if (retries > 0 && 
retryPredicate.test(throwable)) {
+                                                       throwable = 
ExceptionUtils.stripExecutionException(throwable);
+                                                       if 
(!retryPredicate.test(throwable)) {
+                                                               
resultFuture.completeExceptionally(throwable);
+                                                       } else if (retries > 0) 
{
                                                                final 
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
                                                                        () -> 
retryOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, 
retryPredicate, scheduledExecutor),
                                                                        
retryDelay.toMilliseconds(),
@@ -207,12 +210,10 @@
                                                                
resultFuture.whenComplete(
                                                                        
(innerT, innerThrowable) -> scheduledFuture.cancel(false));
                                                        } else {
-                                                               final String 
errorMsg = retries == 0 ?
-                                                                       "Number 
of retries has been exhausted." :
-                                                                       
"Exception is not retryable.";
-                                                               
resultFuture.completeExceptionally(new RetryException(
-                                                                       "Could 
not complete the operation. " + errorMsg,
-                                                                       
throwable));
+                                                               RetryException 
retryException = new RetryException(
+                                                                       "Could 
not complete the operation. Number of retries has been exhausted.",
+                                                                       
throwable);
+                                                               
resultFuture.completeExceptionally(retryException);
                                                        }
                                                }
                                        } else {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index e4cec086017..9cfb58e98a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
@@ -39,6 +40,7 @@
 
 import javax.annotation.Nonnull;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -80,43 +82,26 @@ protected AbstractRestHandler(
                }
 
                return response.whenComplete((P resp, Throwable throwable) -> {
-                       if (throwable != null) {
-
-                               Throwable error = 
ExceptionUtils.stripCompletionException(throwable);
-
-                               if (error instanceof RestHandlerException) {
-                                       final RestHandlerException rhe = 
(RestHandlerException) error;
-
-                                       processRestHandlerException(ctx, 
httpRequest, rhe);
-                               } else {
-                                       log.error("Implementation error: 
Unhandled exception.", error);
-                                       HandlerUtils.sendErrorResponse(
-                                               ctx,
-                                               httpRequest,
-                                               new ErrorResponseBody("Internal 
server error."),
-                                               
HttpResponseStatus.INTERNAL_SERVER_ERROR,
-                                               responseHeaders);
-                               }
-                       } else {
-                               HandlerUtils.sendResponse(
-                                       ctx,
-                                       httpRequest,
-                                       resp,
-                                       messageHeaders.getResponseStatusCode(),
-                                       responseHeaders);
-                       }
+                       Tuple2<ResponseBody, HttpResponseStatus> r = throwable 
!= null ?
+                               errorResponse(throwable) : Tuple2.of(resp, 
messageHeaders.getResponseStatusCode());
+                       HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, 
responseHeaders);
                }).thenApply(ignored -> null);
        }
 
-       private void processRestHandlerException(ChannelHandlerContext ctx, 
HttpRequest httpRequest, RestHandlerException rhe) {
-               log.error("Exception occurred in REST handler.", rhe);
-
-               HandlerUtils.sendErrorResponse(
-                       ctx,
-                       httpRequest,
-                       new ErrorResponseBody(rhe.getMessage()),
-                       rhe.getHttpResponseStatus(),
-                       responseHeaders);
+       private Tuple2<ResponseBody, HttpResponseStatus> 
errorResponse(Throwable throwable) {
+               Throwable error = 
ExceptionUtils.stripCompletionException(throwable);
+               if (error instanceof RestHandlerException) {
+                       final RestHandlerException rhe = (RestHandlerException) 
error;
+                       log.error("Exception occurred in REST handler.", rhe);
+                       return Tuple2.of(new 
ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
+               } else {
+                       log.error("Implementation error: Unhandled exception.", 
error);
+                       String stackTrace = String.format("<Exception on server 
side:%n%s%nEnd of exception on server side>",
+                               ExceptionUtils.stringifyException(throwable));
+                       return Tuple2.of(
+                               new ErrorResponseBody(Arrays.asList("Internal 
server error.", stackTrace)),
+                               HttpResponseStatus.INTERNAL_SERVER_ERROR);
+               }
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index 491ba094f4c..e3101bef5ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -114,10 +114,7 @@ public JobSubmitHandler(
                CompletableFuture<Acknowledge> jobSubmissionFuture = 
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, 
timeout));
 
                return jobSubmissionFuture.thenCombine(jobGraphFuture,
-                       (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
-                       .exceptionally(exception -> {
-                               throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
-                       });
+                       (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
        }
 
        private CompletableFuture<JobGraph> loadJobGraph(JobSubmitRequestBody 
requestBody, Map<String, Path> nameToFile) throws MissingFileException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index c386952c056..99962a6848e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -82,7 +82,7 @@ public void testRetrySuccess() throws Exception {
                        TestingUtils.defaultExecutor());
 
                assertTrue(retryFuture.get());
-               assertTrue(retries == atomicInteger.get());
+               assertEquals(retries, atomicInteger.get());
        }
 
        /**
@@ -274,7 +274,7 @@ public void testRetryWithDelayAndPredicate() throws 
Exception {
                                        throwable instanceof RuntimeException 
&& throwable.getMessage().contains(retryableExceptionMessage),
                                new 
ScheduledExecutorServiceAdapter(retryExecutor)).get();
                } catch (final ExecutionException e) {
-                       assertThat(e.getMessage(), containsString("Could not 
complete the operation"));
+                       assertThat(e.getMessage(), containsString("should 
propagate"));
                } finally {
                        retryExecutor.shutdownNow();
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
index be1cb797748..78ace967bec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -269,11 +269,7 @@ public void testFailedJobSubmission() throws Exception {
                                .get();
                } catch (Exception e) {
                        Throwable t = ExceptionUtils.stripExecutionException(e);
-                       if (t instanceof RestHandlerException){
-                               Assert.assertTrue(t.getMessage().equals("Job 
submission failed."));
-                       } else {
-                               throw e;
-                       }
+                       Assert.assertEquals(errorMessage, t.getMessage());
                }
        }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to