tillrohrmann commented on a change in pull request #14837:
URL: https://github.com/apache/flink/pull/14837#discussion_r571627447



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
##########
@@ -111,7 +117,12 @@ public void close() {
 
         LOG.info("Closing {}.", this);
         leaderElector.stop();
-        kubernetesWatch.close();
+
+        if (kubernetesWatch != null) {

Review comment:
       I think that we should do this check under the `watchLock`. It might be 
correct now but saying that all accesses to `kubernetesWatch` happen under the 
lock is easier to understand.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
##########
@@ -234,10 +245,21 @@ public void onError(List<KubernetesConfigMap> configMaps) 
{
         }
 
         @Override
-        public void handleFatalError(Throwable throwable) {
-            fatalErrorHandler.onFatalError(
-                    new LeaderElectionException(
-                            "Error while watching the ConfigMap " + 
configMapName, throwable));
+        public void handleError(Throwable throwable) {
+            if (throwable instanceof KubernetesTooOldResourceVersionException) 
{
+                synchronized (watchLock) {
+                    if (running) {
+                        LOG.info("Creating a new watch on ConfigMap {}.", 
configMapName);
+                        kubernetesWatch =

Review comment:
       Is it somewhere specified that `kubernetesWatch` closes automatically if 
an exception occurs? If not, then I think we should still call 
`kubernetesWatch.close` on the old instance. It might not be required right now 
because of some implementation details of the existing implementations, but 
this might change in the future.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
##########
@@ -81,7 +89,11 @@ public void close() {
         running = false;
 
         LOG.info("Stopping {}.", this);
-        kubernetesWatch.close();
+        if (kubernetesWatch != null) {

Review comment:
       Same here. I would do the check under the `watchLock`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
##########
@@ -114,10 +126,21 @@ public void onError(List<KubernetesConfigMap> configMaps) 
{
         }
 
         @Override
-        public void handleFatalError(Throwable throwable) {
-            fatalErrorHandler.onFatalError(
-                    new LeaderRetrievalException(
-                            "Error while watching the ConfigMap " + 
configMapName));
+        public void handleError(Throwable throwable) {
+            if (throwable instanceof KubernetesTooOldResourceVersionException) 
{
+                synchronized (watchLock) {
+                    if (running) {
+                        LOG.info("Creating a new watch on ConfigMap {}.", 
configMapName);
+                        kubernetesWatch =

Review comment:
       Is it somewhere specified that `kubernetesWatch` closes automatically if 
an exception occurs? If not, then I think we should still call 
`kubernetesWatch.close` on the old instance. It might not be required right now 
because of some implementation details of the existing implementations, but 
this might change in the future.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -359,8 +363,17 @@ public void onError(List<KubernetesPod> pods) {
         }
 
         @Override
-        public void handleFatalError(Throwable throwable) {
-            getResourceEventHandler().onError(throwable);
+        public void handleError(Throwable throwable) {
+            if (throwable instanceof KubernetesTooOldResourceVersionException) 
{
+                getMainThreadExecutor()
+                        .execute(
+                                () -> {
+                                    log.info("Creating a new watch on 
TaskManager pods.");
+                                    podsWatchOpt = watchTaskManagerPods();

Review comment:
       I think we need to check whether the `KubernetesResourceManagerDriver` 
is still running or has been `terminated` already. Moreover, we should probably 
close the old `podsWatchOpt` to release potentially acquired resources.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to