zentol commented on a change in pull request #17606:
URL: https://github.com/apache/flink/pull/17606#discussion_r740932666



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       The issue depends on when the first heartbeat is sent after the TM went 
down.
   
   If it happens before the restart, then we properly remove the TM and don't 
use it later on.
   If it does not happen before the restart, then the job fails later on a 
second time while attempting to deploy task to the same TM.
   
   I think this change just ever so slightly adjusts the timings to make it 
more common; it can already happen in the current master.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       The issue depends on when the first heartbeat is sent after the TM went 
down (and would then be considered unreachable).
   
   If it happens before the restart, then we properly remove the TM and don't 
use it later on.
   If it does not happen before the restart, then the job fails later on a 
second time while attempting to deploy task to the same TM.
   
   I think this change just ever so slightly adjusts the timings to make it 
more common; it can already happen in the current master.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       - JM sends heartbeat to TM1 (could be transmitted, but may not get a 
response)
   - TM1 is killed
   - job restarts (not delayed by missing TM1 because cancelTask RPCs fail 
immediately)
   - JM sends heartbeat to TM1 (could not be transmitted)
   - job restarts second time

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       - JM sends heartbeat to TM1 (transmitted, but may not get a response)
   - TM1 is killed
   - job restarts (not delayed by missing TM1 because cancelTask RPCs fail 
immediately)
   - JM sends heartbeat to TM1 (not transmitted)
   - job restarts second time

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       - JM sends heartbeat request to TM1 (transmitted, but may not get a 
response)
   - TM1 is killed
   - job restarts (not delayed by missing TM1 because cancelTask RPCs fail 
immediately)
   - JM sends heartbeat request to TM1 (not transmitted)
   - job restarts second time

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       I can also reproduce it locally where there are no processing gaps.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       The restart delay is a fair point, I'll check the logs again.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       It looks like the failed heartbeats are being ignored while we are 
waiting for the restart delay:
   
   ```
   // 64589-0ef458 is the killed TM
   
   // this is the last heartbeat
   17532 o.a.f.r.jobmaster.JobMaster [] - Received heartbeat from 
127.0.0.1:64589-0ef458.
   ...
   <start job restart>
   <cancel slot requests>
   <cleanup partitions>
   <various failed cancelTask RPCs>
   17740 <reduce resource requirements to 0>
   ...
   17440... JM idling, sending heartbeat requests
   19212 o.a.f.r.jobmaster.JobMaster [] - Archive local failure causing attempt 
05bcf9159a5a301d2f7b6566111235da to fail
   ...
   19213  o.a.f.r.executiongraph.ExecutionGraph [] - Job Flink Java Job at Mon 
Nov 01 15:42:30 CET 2021 (4daf5dcbf65f7cd384ac228ad72ab5c6) switched from state 
RESTARTING to RUNNING.
   19777  o.a.f.r.jobmaster.JobMaster [] - TaskManager with id 
127.0.0.1:64589-0ef458 is no longer reachable.
   19777 o.a.f.r.jobmaster.JobMaster [] - Disconnect TaskExecutor 
127.0.0.1:64589-0ef458 because: TaskManager with id 127.0.0.1:64589-0ef458 is 
no longer reachable.
   ```

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       I'm wondering if the exception the HeartbeatMonitorImpl sees could be 
wrapped in a CompletionException...

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       💢 
   ```
   21440 o.a.f.r.j.JobMaster [] - #handleHeartbeatRpcFailure exception
   java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException
   ```

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       I suppose stripping the `CompletionException` in the 
`HeartbeatManagerImpl` should be done in any case because it is so easy to 
introduce bugs like this.
   
   I'm curious though whether we should revert the `AkkaInvocationHandler` to 
again to the manual forwarding of the result...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to