tillrohrmann commented on a change in pull request #17606: URL: https://github.com/apache/flink/pull/17606#discussion_r740889079
########## File path: flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ########## @@ -240,17 +241,20 @@ private Object invokeRpc(Method method, Object[] args) throws Exception { final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout); final CompletableFuture<Object> completableFuture = new CompletableFuture<>(); - resultFuture.whenComplete( - (resultValue, failure) -> { - if (failure != null) { - completableFuture.completeExceptionally( - resolveTimeoutException( - failure, callStackCapture, address, rpcInvocation)); - } else { - completableFuture.complete( - deserializeValueIfNeeded(resultValue, method)); - } - }); + FutureUtils.forward( Review comment: I think you are right. We don't need the `forward` here. ########## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java ########## @@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration configuration, final File coord ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", 1337, configuration); env.setParallelism(PARALLELISM); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L)); Review comment: I don't fully understand why this change is now required. Can we explain why this is the case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org