[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-07-03 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/6222


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199783421
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -66,6 +67,9 @@ public JobSubmitHandler(
}
 
return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
+   .exceptionally(exception -> {
+   throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
--- End diff --

I would be in favor of approach 3 because we are doing something similar 
for the `JobExecutionResult`/`JobResult`. We could then throw the exception on 
the `RestClusterClient`. And I also agree that this is something we can add as 
a follow up. Can you please create a JIRA issue for this @zentol.


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199719645
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -66,6 +67,9 @@ public JobSubmitHandler(
}
 
return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
+   .exceptionally(exception -> {
+   throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
--- End diff --

Do note that this discussion isn't really blocking the PR from being merged 
as it would effectively be an extension of it.


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-07-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199477966
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -66,6 +67,9 @@ public JobSubmitHandler(
}
 
return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
+   .exceptionally(exception -> {
+   throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
--- End diff --

well, there's no doubt that it _could_ be helpful; my point is that it can 
be _harmful_ if not done properly.

The `submitJob` should either provide the `JobSubmitHandler` with means to 
detect these exceptions and create adequate responses, or explicitly throw 
exceptions with messages that we can safely pass on to users.

That said, I do not know how to do either of these things in a good way. 
😞 

For completeness sake, here are ideas that came to mind:

## 1
Introduce a special `FlinkUserFacingException` that we "trust" to contain a 
good error message.

Con: This provides little additional safety and will never provide proper 
HTTP response code.

## 2
Introduce dedicated exceptions for the scenarios that you listed and 
explicitly look for them in the `exceptionally` block, i.e
```
.exceptionally(exception -> {
if (exception instanceof JobAlreadyExistsException) {
throw new CompletionException(new RestHandlerException("Job 
already exists.", HttpResponseStatus.BAD_REQUEST, exception));
} else {
throw new CompletionException(new RestHandlerException("Job 
submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
}
}
```

Con: Obviously, this approach is inherently flawed as there is no guarantee 
that a given exception can be thrown; we would have to manually keep it in sync 
with the actual implementation because `CompletableFuture` throw a wrench into 
sane exception handling. 😡 

## 3
Encode possible user-facing exceptions in the return value of `submitJob`, 
i.e. return a `AckOrException`
```
public class AckOrException {
// holds exception, could also be a series of nullable fields
private final SuperEither 
exception; 
...
public void throwIfError() throws ExceptionA, ExceptionB, ExceptionC;
}
```
Con: Relies on users to call `throwIfError` and introduces an entirely 
separate channel for passing errors, but it would allow exception matching.


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-06-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199333683
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -66,6 +67,9 @@ public JobSubmitHandler(
}
 
return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
+   .exceptionally(exception -> {
+   throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
--- End diff --

I see your point. I'm just wondering whether some bits of context wouldn't 
be helpful on the client side when using the CLI. So for example if the job was 
misconfigured or if it was already submitted to the cluster in HA mode, then it 
would be helpful for the user to know.


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-06-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199248513
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -66,6 +67,9 @@ public JobSubmitHandler(
}
 
return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
+   .exceptionally(exception -> {
+   throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
--- End diff --

I'm not quite fond of the idea. As alluded in the PR description and JIRA I 
agree that the existing error message isn't _helpful_, yet better than the 
current state.

I rather like that so far the REST API has control over the error messages. 
This ensures that the user only sees messages that were actually meant for him.

In contrast, exception messages are pretty much arbitrary. They may change 
at will, the audience isn't defined (user vs dev), may only helpful if the 
fully stack trace is present, often don't have any message at all (see usages 
of `Preconditions`, or NPEs) and typically only describe what went wrong, not 
why, how to fix it or if it even was a user-error. Given that this would break 
down the barrier between internal/user-facing messages you obviously also run 
into cases where users have _no idea_ what the message even means. Finally you 
end up with mismatches between the error message and error code.

To me the underlying issue is that `submitJob` funnels all manner of 
exceptions into a `FlinkException/JobSubmissionException` that we can't do much 
with. Neither can we categorize them in any way, nor distinguish between who's 
responsible (user vs Flink) nor when in the process the failure occurred.
Without diving into the implementation you don't _even know which 
exceptions are thrown_, but i suppose this is a general issue of 
`CompletableFutures`.


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199170668
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -87,4 +89,33 @@ public void testSuccessfulJobSubmission() throws 
Exception {
handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
.get();
}
+
+   @Test
+   public void testFailedJobSubmission() throws Exception {
+   final String errorMessage = "test";
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(FutureUtils.completedExceptionally(new 
Exception(errorMessage)));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
--- End diff --

No need to create a mock. `() -> CompletableFuture.completed(mockGateway)` 
should be good enough.


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199170250
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -66,6 +67,9 @@ public JobSubmitHandler(
}
 
return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
+   .exceptionally(exception -> {
+   throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
--- End diff --

Maybe we could add the `exception.getMessage` to the `message` of the 
`RestHandlerException`. Otherwise the user will only see `"Job submission 
failed."` in the `ErrorResponseBody`. With the change it could be `"Job 
submission failed: Failure cause"`


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6222#discussion_r199171079
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -87,4 +89,33 @@ public void testSuccessfulJobSubmission() throws 
Exception {
handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
.get();
}
+
+   @Test
+   public void testFailedJobSubmission() throws Exception {
+   final String errorMessage = "test";
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(FutureUtils.completedExceptionally(new 
Exception(errorMessage)));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234";),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT,
+   Collections.emptyMap());
+
+   JobGraph job = new JobGraph("testjob");
+   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+
+   try {
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
+   .get();
+   } catch (Exception e) {
+   Throwable t = ExceptionUtils.stripExecutionException(e);
+   if (t instanceof RestHandlerException){
+   Assert.assertTrue(t.getMessage().equals("Job 
submission failed."));
+   } else {
+   throw e;
+   }
+   }
--- End diff --

I think we should make sure that `errorMessage` is part of the 
`RestHandlerException#message`. Otherwise this information won't be sent to the 
client in the form of the `ErrorResponseBody`.


---


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-06-28 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6222

[FLINK-8785][rest] Handle JobSubmissionExceptions

## What is the purpose of the change

This PR modifies the `JobSubmitHandler` to handle exceptions contained in 
the future returned by `DispatcherGateway#submitJob`.

An exception handler was added via `CompletableFuture#exceptionally` to 
return a proper `ErrorResponseBody` signaling that the job submission has 
failed.

This PR is pretty much the bare-bones solution; in the JIRA I advocated for 
including error messages from exceptions since there are various reasons why 
the submission could fail, but I can't find a satisfying solution.

## Verifying this change

* see new test in `JobSubmitHandlerTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8785_basic

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6222.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6222


commit 32fe49270596cdcf2f91f822c3a6504a14ba40eb
Author: zentol 
Date:   2018-06-28T08:57:01Z

[FLINK-8785][rest] Handle JobSubmissionExceptions




---