Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4645#discussion_r137065666
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
    @@ -159,24 +155,17 @@ public void shutdown(Time timeout) {
        }
     
        private <P extends ResponseBody> CompletableFuture<P> 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, Class<P> responseClass) {
    -           return CompletableFuture.supplyAsync(() -> 
bootstrap.connect(targetAddress, targetPort), executor)
    -                   .thenApply((channel) -> {
    -                           try {
    -                                   return channel.sync();
    -                           } catch (InterruptedException e) {
    -                                   throw new FlinkRuntimeException(e);
    -                           }
    -                   })
    -                   .thenApply((ChannelFuture::channel))
    -                   .thenCompose(channel -> {
    -                           ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
    -                           CompletableFuture<JsonResponse> future = 
handler.getJsonFuture();
    -                           channel.writeAndFlush(httpRequest);
    -                           return future;
    -                   }).thenComposeAsync(
    -                           (JsonResponse rawResponse) -> 
parseResponse(rawResponse, responseClass),
    -                           executor
    -                   );
    +           ChannelFuture connect = bootstrap.connect(targetAddress, 
targetPort);
    +           Channel channel;
    --- End diff --
    
    Technically yes, this alone would fix the problem at hand.
    
    The sending part was modified as a band-aid to alleviate issues that can 
occur under heavy load since we don't have any throttling mechanism for 
accepting requests. For example, in one of our tests we had thousands of 
futures suddenly fail because all ports were in use, not to mention that you 
can allocate a virtually infinite number of futures (way faster than we could 
ever process them).
    
    I couldn't quickly come up with a good scheme to throttle requests though. 
Suggestions are welcome ;)


---

Reply via email to