[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r755120267 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -87,18 +88,19 @@ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class); -@VisibleForTesting -static final Set> PRE_COMMIT_EXCEPTIONS = -newHashSet( -KeeperException.NodeExistsException.class, -KeeperException.BadArgumentsException.class, -KeeperException.NoNodeException.class, -KeeperException.NoAuthException.class, -KeeperException.BadVersionException.class, -KeeperException.AuthFailedException.class, -KeeperException.InvalidACLException.class, -KeeperException.SessionMovedException.class, -KeeperException.NotReadOnlyException.class); +/** Pre-commit exceptions that don't imply data inconsistency. */ +private static final Set> SAFE_PRE_COMMIT_EXCEPTIONS = +new HashSet<>( +Arrays.asList( +KeeperException.NodeExistsException.class, +KeeperException.BadArgumentsException.class, +KeeperException.NoNodeException.class, +KeeperException.NoAuthException.class, +KeeperException.BadVersionException.class, +KeeperException.AuthFailedException.class, +KeeperException.InvalidACLException.class, +KeeperException.SessionMovedException.class, +KeeperException.NotReadOnlyException.class)); Review comment: One more suggestion, let's merge it as is and we should follow-up in different PR. It's still a big improvement from the original state and we'd need the patch for the transaction code path anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r755118309 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -87,18 +88,19 @@ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class); -@VisibleForTesting -static final Set> PRE_COMMIT_EXCEPTIONS = -newHashSet( -KeeperException.NodeExistsException.class, -KeeperException.BadArgumentsException.class, -KeeperException.NoNodeException.class, -KeeperException.NoAuthException.class, -KeeperException.BadVersionException.class, -KeeperException.AuthFailedException.class, -KeeperException.InvalidACLException.class, -KeeperException.SessionMovedException.class, -KeeperException.NotReadOnlyException.class); +/** Pre-commit exceptions that don't imply data inconsistency. */ +private static final Set> SAFE_PRE_COMMIT_EXCEPTIONS = +new HashSet<>( +Arrays.asList( +KeeperException.NodeExistsException.class, +KeeperException.BadArgumentsException.class, +KeeperException.NoNodeException.class, +KeeperException.NoAuthException.class, +KeeperException.BadVersionException.class, +KeeperException.AuthFailedException.class, +KeeperException.InvalidACLException.class, +KeeperException.SessionMovedException.class, +KeeperException.NotReadOnlyException.class)); Review comment: I'm just desperate here... I'd say except `NoNodeException` and `BadArgumentsException` all of them are "potentially" unsafe if preceded by connection loss. I'd be in favor of removing this list completely as it's really hard to reason about. I'd even suggest reverting https://issues.apache.org/jira/browse/FLINK-22494 and coming up with a different solution. (discarding checkpoint & validating metadata before recovery) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r755105408 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -87,18 +88,19 @@ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class); -@VisibleForTesting -static final Set> PRE_COMMIT_EXCEPTIONS = -newHashSet( -KeeperException.NodeExistsException.class, -KeeperException.BadArgumentsException.class, -KeeperException.NoNodeException.class, -KeeperException.NoAuthException.class, -KeeperException.BadVersionException.class, -KeeperException.AuthFailedException.class, -KeeperException.InvalidACLException.class, -KeeperException.SessionMovedException.class, -KeeperException.NotReadOnlyException.class); +/** Pre-commit exceptions that don't imply data inconsistency. */ +private static final Set> SAFE_PRE_COMMIT_EXCEPTIONS = +new HashSet<>( +Arrays.asList( +KeeperException.NodeExistsException.class, Review comment: Replace should never throw this exception (as it works only when node exists). After this change neither should `addAndLock`, so I'd say it should be safe to remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r755095766 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -87,18 +88,19 @@ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class); -@VisibleForTesting -static final Set> PRE_COMMIT_EXCEPTIONS = -newHashSet( -KeeperException.NodeExistsException.class, -KeeperException.BadArgumentsException.class, -KeeperException.NoNodeException.class, -KeeperException.NoAuthException.class, -KeeperException.BadVersionException.class, -KeeperException.AuthFailedException.class, -KeeperException.InvalidACLException.class, -KeeperException.SessionMovedException.class, -KeeperException.NotReadOnlyException.class); +/** Pre-commit exceptions that don't imply data inconsistency. */ +private static final Set> SAFE_PRE_COMMIT_EXCEPTIONS = +new HashSet<>( +Arrays.asList( +KeeperException.NodeExistsException.class, +KeeperException.BadArgumentsException.class, +KeeperException.NoNodeException.class, +KeeperException.NoAuthException.class, +KeeperException.BadVersionException.class, +KeeperException.AuthFailedException.class, +KeeperException.InvalidACLException.class, +KeeperException.SessionMovedException.class, +KeeperException.NotReadOnlyException.class)); Review comment: I'd say in very rare conditions it could happen (if connection loss happens right before retries are exhausted). We default to 3 retries with exponential backoff. We could also argue that this could happen `AuthFailed` / `InvalidACLException` when there is some manual operation on the broker side. Eg. sys-admin changes the ACL Description of `SessionMovedException` ``` SESSIONMOVED Session moved to another server, so operation is ignored ``` From `RetryLoop`: ``` public static boolean shouldRetry(int rc) { return rc == Code.CONNECTIONLOSS.intValue() || rc == Code.OPERATIONTIMEOUT.intValue() || rc == Code.SESSIONMOVED.intValue() || rc == Code.SESSIONEXPIRED.intValue() || rc == -13; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r755079258 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,43 +156,40 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); - -RetrievableStateHandle storeHandle = storage.store(state); - -byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); - +if (exists(path).isExisting()) { +throw new AlreadyExistException( +String.format("ZooKeeper node %s already exists.", path)); +} +final RetrievableStateHandle storeHandle = storage.store(state); +final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); try { writeStoreHandleTransactionally(path, serializedStoreHandle); return storeHandle; +} catch (KeeperException.NodeExistsException e) { +// Transactions are not idempotent in the curator version we're currently using, so it +// is actually possible that we've re-tried a transaction that has already succeeded. +// We've ensured that the node hasn't been present prior executing the transaction, so +// we can assume that this is a result of the retry mechanism. +return storeHandle; Review comment: I don't think so, replace actually assumes that node is already present. It simply updates the data in an existing znode, which should be idempotent if retried. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r743562938 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); Review comment: It could indeed result in the `NodeExistsException`. However this wouldn't introduce inconsistencies, because we don't discard the underlying state. On the other hand, this would trigger the "unexpected" catch block bellow ... 🤔 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); Review comment: We can simply get rid of this write lock. It was meant just a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); Review comment: We can simply get rid of this write lock. It was meant as a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); +} catch (KeeperException.NodeExistsException e) { +// There should always be a single JobMaster for each job, so we should never +// encounter this. +throw new IllegalStateException( +"Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.", +e); +} +if (exists(path).isExisting()) { +throw new AlreadyExistException( +String.format("ZooKeeper node %s already exists.", path)); +} +return doAddAndLock(path, state); +} finally { +// Release the write-lock. +deleteIfExists(getWriteLockPath(path)); +} +} -RetrievableStateHandle storeHandle = storage.store(state); - -byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); - +private RetrievableStateHandle doAddAndLock(String path, T state) throws Exception { +final RetrievableStateHandle storeHandle = storage.store(state); +final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); try { writeStoreHandleTransactionally(path, serializedStoreHandle); return storeHandle; } catch (Exception e) { if (indicatesPossiblyInconsistentState(e)) { throw new PossibleInconsistentStateException(e); } - -// in any other failure case: discard the state +// In case of any other failure, discard the state and rethrow the exception. storeHandle.discardState(); - -// We wrap the exception here so that it could be caught in DefaultJobGraphStore -
[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r743562938 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); Review comment: It could indeed result in the `NodeExistsException`. However this wouldn't introduce inconsistencies, because we don't discard the underlying state. On the other hand, this would trigger the "unexpected" catch block bellow ... 🤔 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); Review comment: We can simply get rid of this write lock. It was meant just a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); Review comment: We can simply get rid of this write lock. It was meant as a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); +} catch (KeeperException.NodeExistsException e) { +// There should always be a single JobMaster for each job, so we should never +// encounter this. +throw new IllegalStateException( +"Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.", +e); +} +if (exists(path).isExisting()) { +throw new AlreadyExistException( +String.format("ZooKeeper node %s already exists.", path)); +} +return doAddAndLock(path, state); +} finally { +// Release the write-lock. +deleteIfExists(getWriteLockPath(path)); +} +} -RetrievableStateHandle storeHandle = storage.store(state); - -byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); - +private RetrievableStateHandle doAddAndLock(String path, T state) throws Exception { +final RetrievableStateHandle storeHandle = storage.store(state); +final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); try { writeStoreHandleTransactionally(path, serializedStoreHandle); return storeHandle; } catch (Exception e) { if (indicatesPossiblyInconsistentState(e)) { throw new PossibleInconsistentStateException(e); } - -// in any other failure case: discard the state +// In case of any other failure, discard the state and rethrow the exception. storeHandle.discardState(); - -// We wrap the exception here so that it could be caught in DefaultJobGraphStore -
[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r743565748 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); +} catch (KeeperException.NodeExistsException e) { +// There should always be a single JobMaster for each job, so we should never +// encounter this. +throw new IllegalStateException( +"Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.", +e); +} +if (exists(path).isExisting()) { +throw new AlreadyExistException( +String.format("ZooKeeper node %s already exists.", path)); +} +return doAddAndLock(path, state); +} finally { +// Release the write-lock. +deleteIfExists(getWriteLockPath(path)); +} +} -RetrievableStateHandle storeHandle = storage.store(state); - -byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); - +private RetrievableStateHandle doAddAndLock(String path, T state) throws Exception { +final RetrievableStateHandle storeHandle = storage.store(state); +final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); try { writeStoreHandleTransactionally(path, serializedStoreHandle); return storeHandle; } catch (Exception e) { if (indicatesPossiblyInconsistentState(e)) { throw new PossibleInconsistentStateException(e); } - -// in any other failure case: discard the state +// In case of any other failure, discard the state and rethrow the exception. storeHandle.discardState(); - -// We wrap the exception here so that it could be caught in DefaultJobGraphStore -throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class) -.map( -nee -> -new AlreadyExistException( -"ZooKeeper node " + path + " already exists.", nee)) -.orElseThrow(() -> e); +throw e; } } // this method is provided for the sole purpose of easier testing @VisibleForTesting protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle) throws Exception { -// Write state handle (not the actual state) to ZooKeeper. This is expected to be -// smaller than the state itself. This level of indirection makes sure that data in -// ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but -// the state can be larger. -// Create the lock node in a transaction with the actual state node. That way we can +// Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller +// than the state itself. This level of indirection makes sure that data in ZooKeeper is +// small, because ZooKeeper is designed for data in the KB range, but the state can be +// larger. Create the lock node in a transaction with the actual state node. That way we can // prevent race conditions with a concurrent delete operation. -client.inTransaction() -.create() -.withMode(CreateMode.PERSISTENT) -.forPath(path, serializedStoreHandle) -.and() -.create() -.withMode(CreateMode.EPHEMERAL) -.forPath(getLockPath(path)) -.and() -.commit(); +while (true) { +try { +client.inTransaction() +.create() +.withMode(CreateMode.PERSISTENT) +.forPath(path, serializedStoreHandle) +.and() +.create() +.withMode(CreateMode.EPHEMERAL) +.forPath(getLockPath(path)) +.and() +.commit(); +break; +} catch (KeeperException.NodeExistsException e) { Review comment: We now check whethe
[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r743563968 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); Review comment: We can simply get rid of this write lock. It was meant just a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); Review comment: We can simply get rid of this write lock. It was meant as a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r743562938 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); Review comment: It could indeed result in the `NodeExistsException`. However this wouldn't introduce inconsistencies, because we don't discard the underlying state. On the other hand, this would trigger the "unexpected" catch block bellow ... 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…
dmvk commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r742796681 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); +try { +try { +// Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); +} catch (KeeperException.NodeExistsException e) { +// There should always be a single JobMaster for each job, so we should never +// encounter this. +throw new IllegalStateException( +"Write lock is already taken, which signals there are multiple processing writing to the same path. This should never happen.", +e); +} +if (exists(path).isExisting()) { +throw new AlreadyExistException( +String.format("ZooKeeper node %s already exists.", path)); +} +return doAddAndLock(path, state); +} finally { +// Release the write-lock. +deleteIfExists(getWriteLockPath(path)); +} +} -RetrievableStateHandle storeHandle = storage.store(state); - -byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); - +private RetrievableStateHandle doAddAndLock(String path, T state) throws Exception { +final RetrievableStateHandle storeHandle = storage.store(state); +final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); try { writeStoreHandleTransactionally(path, serializedStoreHandle); return storeHandle; } catch (Exception e) { if (indicatesPossiblyInconsistentState(e)) { throw new PossibleInconsistentStateException(e); } - -// in any other failure case: discard the state +// In case of any other failure, discard the state and rethrow the exception. storeHandle.discardState(); - -// We wrap the exception here so that it could be caught in DefaultJobGraphStore -throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class) -.map( -nee -> -new AlreadyExistException( -"ZooKeeper node " + path + " already exists.", nee)) -.orElseThrow(() -> e); +throw e; } } // this method is provided for the sole purpose of easier testing @VisibleForTesting protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle) throws Exception { -// Write state handle (not the actual state) to ZooKeeper. This is expected to be -// smaller than the state itself. This level of indirection makes sure that data in -// ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but -// the state can be larger. -// Create the lock node in a transaction with the actual state node. That way we can +// Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller +// than the state itself. This level of indirection makes sure that data in ZooKeeper is +// small, because ZooKeeper is designed for data in the KB range, but the state can be +// larger. Create the lock node in a transaction with the actual state node. That way we can // prevent race conditions with a concurrent delete operation. -client.inTransaction() -.create() -.withMode(CreateMode.PERSISTENT) -.forPath(path, serializedStoreHandle) -.and() -.create() -.withMode(CreateMode.EPHEMERAL) -.forPath(getLockPath(path)) -.and() -.commit(); +while (true) { +try { +client.inTransaction() +.create() +.withMode(CreateMode.PERSISTENT) +.forPath(path, serializedStoreHandle) +.and() +.create() +.withMode(CreateMode.EPHEMERAL) +.forPath(getLockPath(path)) +.and() +.commit(); +break; +} catch (KeeperException.NodeExistsException e) { +// Transactions are not i