Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6035#discussion_r189019837 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception cause) { resourceManagerConnection.close(); resourceManagerConnection = null; } + + startRegistrationTimeout(); + } + + private void startRegistrationTimeout() { + final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration(); + + if (maxRegistrationDuration != null) { + final UUID newRegistrationTimeoutId = UUID.randomUUID(); + currentRegistrationTimeoutId = newRegistrationTimeoutId; + scheduleRunAsync(() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration); + } + } + + private void stopRegistrationTimeout() { + currentRegistrationTimeoutId = null; + } + + private void registrationTimeout(@Nonnull UUID registrationTimeoutId) { + if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) { + final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration(); + + onFatalError( + new RegistrationTimeoutException( + String.format("Could not register at the ResourceManager within the specified maximum " + + "registration duration %s. This indicates a problem with this instance. Terminating now.", + maxRegistrationDuration))); + } else { + log.debug("Ignoring outdated registration timeout."); --- End diff -- True, will remove it.
---