tillrohrmann commented on a change in pull request #15393: URL: https://github.com/apache/flink/pull/15393#discussion_r605083950
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java ########## @@ -127,7 +129,14 @@ private String createZkPath(JobID jobID) { private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception { LOG.debug("Setting scheduling state for job {} to {}.", jobID, status); final String zkPath = createZkPath(jobID); - this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); - this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING)); + final Stat stat = this.client.checkExists().forPath(zkPath); + if (stat != null) { + this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING)); + } else { + this.client + .create() + .creatingParentContainersIfNeeded() + .forPath(zkPath, status.name().getBytes(ENCODING)); + } Review comment: Maybe we could add tests using the `TestingServer` where we prepare the znodes for the path in order to test the different cases. 1. Empty path 2. Path already exists The same for the `clearJob` method. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java ########## @@ -127,7 +129,14 @@ private String createZkPath(JobID jobID) { private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception { LOG.debug("Setting scheduling state for job {} to {}.", jobID, status); final String zkPath = createZkPath(jobID); - this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); - this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING)); + final Stat stat = this.client.checkExists().forPath(zkPath); + if (stat != null) { + this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING)); + } else { + this.client + .create() + .creatingParentContainersIfNeeded() + .forPath(zkPath, status.name().getBytes(ENCODING)); + } Review comment: The same can happen when calling `clearJob`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java ########## @@ -127,7 +129,14 @@ private String createZkPath(JobID jobID) { private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception { LOG.debug("Setting scheduling state for job {} to {}.", jobID, status); final String zkPath = createZkPath(jobID); - this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); - this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING)); + final Stat stat = this.client.checkExists().forPath(zkPath); + if (stat != null) { + this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING)); + } else { + this.client + .create() + .creatingParentContainersIfNeeded() + .forPath(zkPath, status.name().getBytes(ENCODING)); + } Review comment: In order to make this method rock solid, I think we need to wrap it in a while loop and retry the operation if something has changed between `this.client.checkExists().forPath(zkPath)` and one of the if branches. For example, it could have happened, that after the `checkExists` which returns `true`, that somebody else removes the node. Then the `this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING));` should throw a `KeeperException.NoNodeException`. This should in practice not really happen but at the moment the creation/update procedure is not an atomic operation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org