tillrohrmann commented on a change in pull request #15393:
URL: https://github.com/apache/flink/pull/15393#discussion_r605083950



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
##########
@@ -127,7 +129,14 @@ private String createZkPath(JobID jobID) {
     private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) 
throws Exception {
         LOG.debug("Setting scheduling state for job {} to {}.", jobID, status);
         final String zkPath = createZkPath(jobID);
-        
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-        this.client.setData().forPath(zkPath, 
status.name().getBytes(ENCODING));
+        final Stat stat = this.client.checkExists().forPath(zkPath);
+        if (stat != null) {
+            this.client.setData().forPath(zkPath, 
status.name().getBytes(ENCODING));
+        } else {
+            this.client
+                    .create()
+                    .creatingParentContainersIfNeeded()
+                    .forPath(zkPath, status.name().getBytes(ENCODING));
+        }

Review comment:
       Maybe we could add tests using the `TestingServer` where we prepare  the 
znodes for the path in order to test the different cases.
   
   1. Empty path
   2. Path already exists
   
   The same for the `clearJob` method.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
##########
@@ -127,7 +129,14 @@ private String createZkPath(JobID jobID) {
     private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) 
throws Exception {
         LOG.debug("Setting scheduling state for job {} to {}.", jobID, status);
         final String zkPath = createZkPath(jobID);
-        
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-        this.client.setData().forPath(zkPath, 
status.name().getBytes(ENCODING));
+        final Stat stat = this.client.checkExists().forPath(zkPath);
+        if (stat != null) {
+            this.client.setData().forPath(zkPath, 
status.name().getBytes(ENCODING));
+        } else {
+            this.client
+                    .create()
+                    .creatingParentContainersIfNeeded()
+                    .forPath(zkPath, status.name().getBytes(ENCODING));
+        }

Review comment:
       The same can happen when calling `clearJob`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
##########
@@ -127,7 +129,14 @@ private String createZkPath(JobID jobID) {
     private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) 
throws Exception {
         LOG.debug("Setting scheduling state for job {} to {}.", jobID, status);
         final String zkPath = createZkPath(jobID);
-        
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-        this.client.setData().forPath(zkPath, 
status.name().getBytes(ENCODING));
+        final Stat stat = this.client.checkExists().forPath(zkPath);
+        if (stat != null) {
+            this.client.setData().forPath(zkPath, 
status.name().getBytes(ENCODING));
+        } else {
+            this.client
+                    .create()
+                    .creatingParentContainersIfNeeded()
+                    .forPath(zkPath, status.name().getBytes(ENCODING));
+        }

Review comment:
       In order to make this method rock solid, I think we need to wrap it in a 
while loop and retry the operation if something has changed between 
`this.client.checkExists().forPath(zkPath)` and one of the if branches. For 
example, it could have happened, that after the `checkExists` which returns 
`true`, that somebody else removes the node. Then the 
`this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING));` 
should throw a `KeeperException.NoNodeException`. This should in practice not 
really happen but at the moment the creation/update procedure is not an atomic 
operation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to