xintongsong commented on code in PR #24163: URL: https://github.com/apache/flink/pull/24163#discussion_r1496940478
########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java: ########## @@ -90,7 +92,7 @@ public class KubernetesResourceManagerDriver /** Current max pod index. When creating a new pod, it should increase one. */ private long currentMaxPodId = 0; - private Optional<KubernetesWatch> podsWatchOpt; + private CompletableFuture<KubernetesWatch> podsWatchOptFuture; Review Comment: ```suggestion private CompletableFuture<KubernetesWatch> podsWatchOptFuture = FutureUtils.completedExceptionally( new ResourceManagerException( "KubernetesResourceManagerDriver is not initialized.")); ``` ########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java: ########## @@ -450,17 +462,30 @@ public void handleError(Throwable throwable) { if (throwable instanceof KubernetesTooOldResourceVersionException) { getMainThreadExecutor() .execute( - () -> { - if (running) { - podsWatchOpt.ifPresent(KubernetesWatch::close); - log.info("Creating a new watch on TaskManager pods."); - try { - podsWatchOpt = watchTaskManagerPods(); - } catch (Exception e) { - getResourceEventHandler().onError(e); - } - } - }); + () -> + podsWatchOptFuture.whenCompleteAsync( + (KubernetesWatch watch, Throwable throwable1) -> { + if (running) { + try { + if (watch != null) { + watch.close(); + } + } catch (Exception e) { + log.warn( + "Error when get old watch to close, which is not supposed to happen", + e); + } + log.info( + "Creating a new watch on TaskManager pods."); + try { + podsWatchOptFuture = + watchTaskManagerPods(); + } catch (Exception e) { + getResourceEventHandler().onError(e); + } + } + }, + getMainThreadExecutor())); Review Comment: The outer `getMainThreadExecutor().execute()` is unnecessary. ########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java: ########## @@ -90,7 +92,7 @@ public class KubernetesResourceManagerDriver /** Current max pod index. When creating a new pod, it should increase one. */ private long currentMaxPodId = 0; - private Optional<KubernetesWatch> podsWatchOpt; + private CompletableFuture<KubernetesWatch> podsWatchOptFuture; Review Comment: This helps prevent NPE if `podsWatchOptFuture` is accidentally being used before being initialized. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org