wangyang0918 commented on a change in pull request #14837:
URL: https://github.com/apache/flink/pull/14837#discussion_r571643362



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -359,8 +363,17 @@ public void onError(List<KubernetesPod> pods) {
         }
 
         @Override
-        public void handleFatalError(Throwable throwable) {
-            getResourceEventHandler().onError(throwable);
+        public void handleError(Throwable throwable) {
+            if (throwable instanceof KubernetesTooOldResourceVersionException) 
{
+                getMainThreadExecutor()
+                        .execute(
+                                () -> {
+                                    log.info("Creating a new watch on 
TaskManager pods.");
+                                    podsWatchOpt = watchTaskManagerPods();

Review comment:
       We have done the running check in `KubernetesLeaderElectionDriver` and 
`KubernetesLeaderRetrievalDriver`. The reason why I have not done it here is 
that the JobManager process will exit if 
`KubernetesResourceManagerDriver#terminate` is executed. After then all the 
resources(e.g websockets, executors) will be destroyed. It seems that we do not 
need to do the clean up.
   
   For the old `podsWatchOpt`, I think it does not need to be released because 
`WatchCallbackHandler#handleError` will only be called when the watcher is 
already closing with exception. So I think we do not need to do the duplicated 
close.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to