Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4645#discussion_r139403702
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -159,24 +155,17 @@ public void shutdown(Time timeout) {
}
private <P extends ResponseBody> CompletableFuture<P>
submitRequest(String targetAddress, int targetPort, FullHttpRequest
httpRequest, Class<P> responseClass) {
- return CompletableFuture.supplyAsync(() ->
bootstrap.connect(targetAddress, targetPort), executor)
- .thenApply((channel) -> {
- try {
- return channel.sync();
- } catch (InterruptedException e) {
- throw new FlinkRuntimeException(e);
- }
- })
- .thenApply((ChannelFuture::channel))
- .thenCompose(channel -> {
- ClientHandler handler =
channel.pipeline().get(ClientHandler.class);
- CompletableFuture<JsonResponse> future =
handler.getJsonFuture();
- channel.writeAndFlush(httpRequest);
- return future;
- }).thenComposeAsync(
- (JsonResponse rawResponse) ->
parseResponse(rawResponse, responseClass),
- executor
- );
+ ChannelFuture connect = bootstrap.connect(targetAddress,
targetPort);
+ Channel channel;
--- End diff --
That's true, generally I would also prefer making the entire method async.
For now I would still like to make this change though, and happily revisit this
later on when we have some more robust mechanism in place to deal with heavy
loads.
---