YARN-7053. Move curator transaction support to ZKCuratorManager. (Jonathan Hung via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4249172e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4249172e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4249172e Branch: refs/heads/YARN-5734 Commit: 4249172e1419acdb2b69ae3db43dc59da2aa2e03 Parents: c379310 Author: Subru Krishnan <[email protected]> Authored: Tue Aug 22 19:20:57 2017 -0700 Committer: Subru Krishnan <[email protected]> Committed: Tue Aug 22 19:20:57 2017 -0700 ---------------------------------------------------------------------- .../hadoop/util/curator/ZKCuratorManager.java | 88 +++++++++++- .../util/curator/TestZKCuratorManager.java | 39 ++++++ .../recovery/ZKRMStateStore.java | 139 ++++++------------- 3 files changed, 164 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4249172e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java index 9a031af..e1efcb5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java @@ -26,6 +26,8 @@ import java.util.List; import org.apache.curator.framework.AuthInfo; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.transaction.CuratorTransaction; +import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; import org.apache.curator.retry.RetryNTimes; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -54,7 +56,6 @@ public final class ZKCuratorManager { /** Curator for ZooKeeper. */ private CuratorFramework curator; - public ZKCuratorManager(Configuration config) throws IOException { this.conf = config; } @@ -119,7 +120,6 @@ public final class ZKCuratorManager { /** * Start the connection to the ZooKeeper ensemble. - * @param conf Configuration for the connection. * @throws IOException If the connection cannot be started. */ public void start() throws IOException { @@ -128,7 +128,6 @@ public final class ZKCuratorManager { /** * Start the connection to the ZooKeeper ensemble. - * @param conf Configuration for the connection. * @param authInfos List of authentication keys. * @throws IOException If the connection cannot be started. */ @@ -337,4 +336,87 @@ public final class ZKCuratorManager { public static String getNodePath(String root, String nodeName) { return root + "/" + nodeName; } + + public void safeCreate(String path, byte[] data, List<ACL> acl, + CreateMode mode, List<ACL> fencingACL, String fencingNodePath) + throws Exception { + if (!exists(path)) { + SafeTransaction transaction = createTransaction(fencingACL, + fencingNodePath); + transaction.create(path, data, acl, mode); + transaction.commit(); + } + } + + /** + * Deletes the path. Checks for existence of path as well. + * @param path Path to be deleted. + * @throws Exception if any problem occurs while performing deletion. + */ + public void safeDelete(final String path, List<ACL> fencingACL, + String fencingNodePath) throws Exception { + if (exists(path)) { + SafeTransaction transaction = createTransaction(fencingACL, + fencingNodePath); + transaction.delete(path); + transaction.commit(); + } + } + + public void safeSetData(String path, byte[] data, int version, + List<ACL> fencingACL, String fencingNodePath) + throws Exception { + SafeTransaction transaction = createTransaction(fencingACL, + fencingNodePath); + transaction.setData(path, data, version); + transaction.commit(); + } + + public SafeTransaction createTransaction(List<ACL> fencingACL, + String fencingNodePath) throws Exception { + return new SafeTransaction(fencingACL, fencingNodePath); + } + + /** + * Use curator transactions to ensure zk-operations are performed in an all + * or nothing fashion. This is equivalent to using ZooKeeper#multi. + * + * TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll + * have to rewrite this inner class when we adopt that. + */ + public class SafeTransaction { + private CuratorTransactionFinal transactionFinal; + private String fencingNodePath; + + SafeTransaction(List<ACL> fencingACL, String fencingNodePath) + throws Exception { + this.fencingNodePath = fencingNodePath; + CuratorTransaction transaction = curator.inTransaction(); + transactionFinal = transaction.create() + .withMode(CreateMode.PERSISTENT).withACL(fencingACL) + .forPath(fencingNodePath, new byte[0]).and(); + } + + public void commit() throws Exception { + transactionFinal = transactionFinal.delete() + .forPath(fencingNodePath).and(); + transactionFinal.commit(); + } + + public void create(String path, byte[] data, List<ACL> acl, CreateMode mode) + throws Exception { + transactionFinal = transactionFinal.create() + .withMode(mode).withACL(acl).forPath(path, data).and(); + } + + public void delete(String path) throws Exception { + transactionFinal = transactionFinal.delete().forPath(path).and(); + } + + public void setData(String path, byte[] data, int version) + throws Exception { + transactionFinal = transactionFinal.setData() + .withVersion(version).forPath(path, data).and(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4249172e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java index 3e78a44..486e89a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java @@ -21,11 +21,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.Arrays; import java.util.List; import org.apache.curator.test.TestingServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.util.ZKUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -92,4 +96,39 @@ public class TestZKCuratorManager { children = curator.getChildren("/"); assertEquals(2, children.size()); } + + @Test + public void testTransaction() throws Exception { + List<ACL> zkAcl = ZKUtil.parseACLs(CommonConfigurationKeys.ZK_ACL_DEFAULT); + String fencingNodePath = "/fencing"; + String node1 = "/node1"; + String node2 = "/node2"; + byte[] testData = "testData".getBytes("UTF-8"); + assertFalse(curator.exists(fencingNodePath)); + assertFalse(curator.exists(node1)); + assertFalse(curator.exists(node2)); + ZKCuratorManager.SafeTransaction txn = curator.createTransaction( + zkAcl, fencingNodePath); + txn.create(node1, testData, zkAcl, CreateMode.PERSISTENT); + txn.create(node2, testData, zkAcl, CreateMode.PERSISTENT); + assertFalse(curator.exists(fencingNodePath)); + assertFalse(curator.exists(node1)); + assertFalse(curator.exists(node2)); + txn.commit(); + assertFalse(curator.exists(fencingNodePath)); + assertTrue(curator.exists(node1)); + assertTrue(curator.exists(node2)); + assertTrue(Arrays.equals(testData, curator.getData(node1))); + assertTrue(Arrays.equals(testData, curator.getData(node2))); + + byte[] setData = "setData".getBytes("UTF-8"); + txn = curator.createTransaction(zkAcl, fencingNodePath); + txn.setData(node1, setData, -1); + txn.delete(node2); + assertTrue(curator.exists(node2)); + assertTrue(Arrays.equals(testData, curator.getData(node1))); + txn.commit(); + assertFalse(curator.exists(node2)); + assertTrue(Arrays.equals(setData, curator.getData(node1))); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4249172e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index a445e75..ac67dcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.transaction.CuratorTransaction; -import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -31,6 +29,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.hadoop.util.curator.ZKCuratorManager.SafeTransaction; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -416,9 +415,10 @@ public class ZKRMStateStore extends RMStateStore { ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); if (exists(versionNodePath)) { - safeSetData(versionNodePath, data, -1); + zkManager.safeSetData(versionNodePath, data, -1, zkAcl, fencingNodePath); } else { - safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); } } @@ -447,12 +447,14 @@ public class ZKRMStateStore extends RMStateStore { // increment epoch and store it byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() .toByteArray(); - safeSetData(epochNodePath, storeData, -1); + zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl, + fencingNodePath); } else { // initialize epoch node with 1 for the next time. byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() .toByteArray(); - safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(epochNodePath, storeData, zkAcl, + CreateMode.PERSISTENT, zkAcl, fencingNodePath); } return currentEpoch; @@ -721,7 +723,7 @@ public class ZKRMStateStore extends RMStateStore { // No apps stored under parent path. if (children != null && children.isEmpty()) { try { - safeDelete(parentAppNode); + zkManager.safeDelete(parentAppNode, zkAcl, fencingNodePath); if (LOG.isDebugEnabled()) { LOG.debug("No leaf app node exists. Removing parent node " + parentAppNode); @@ -749,7 +751,8 @@ public class ZKRMStateStore extends RMStateStore { byte[] appStateData = appStateDataPB.getProto().toByteArray(); if (appStateData.length <= zknodeLimit) { - safeCreate(nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(nodeCreatePath, appStateData, zkAcl, + CreateMode.PERSISTENT, zkAcl, fencingNodePath); } else { if (LOG.isDebugEnabled()) { LOG.debug("Application state data size for " + appId + " is " @@ -780,7 +783,8 @@ public class ZKRMStateStore extends RMStateStore { String rootNode = getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex); if (!exists(rootNode)) { - safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); } } } @@ -794,9 +798,11 @@ public class ZKRMStateStore extends RMStateStore { byte[] appStateData = appStateDataPB.getProto().toByteArray(); if (pathExists) { - safeSetData(nodeUpdatePath, appStateData, -1); + zkManager.safeSetData(nodeUpdatePath, appStateData, -1, zkAcl, + fencingNodePath); } else { - safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(nodeUpdatePath, appStateData, zkAcl, + CreateMode.PERSISTENT, zkAcl, fencingNodePath); if (LOG.isDebugEnabled()) { LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " + "exist. Creating a new znode to update the application state."); @@ -839,9 +845,11 @@ public class ZKRMStateStore extends RMStateStore { switch (operation) { case UPDATE: if (exists(path)) { - safeSetData(path, attemptStateData, -1); + zkManager.safeSetData(path, attemptStateData, -1, zkAcl, + fencingNodePath); } else { - safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(path, attemptStateData, zkAcl, + CreateMode.PERSISTENT, zkAcl, fencingNodePath); if (LOG.isDebugEnabled()) { LOG.debug("Path " + path + " for " + appAttemptId + " didn't exist." + " Created a new znode to update the application attempt state."); @@ -849,10 +857,11 @@ public class ZKRMStateStore extends RMStateStore { } break; case STORE: - safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); break; case REMOVE: - safeDelete(path); + zkManager.safeDelete(path, zkAcl, fencingNodePath); break; default: break; @@ -930,10 +939,10 @@ public class ZKRMStateStore extends RMStateStore { for (ApplicationAttemptId attemptId : attempts) { String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString()); - safeDelete(attemptRemovePath); + zkManager.safeDelete(attemptRemovePath, zkAcl, fencingNodePath); } } - safeDelete(appIdRemovePath); + zkManager.safeDelete(appIdRemovePath, zkAcl, fencingNodePath); } else { CuratorFramework curatorFramework = zkManager.getCurator(); curatorFramework.delete().deletingChildrenIfNeeded(). @@ -947,7 +956,7 @@ public class ZKRMStateStore extends RMStateStore { protected synchronized void storeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { - SafeTransaction trx = new SafeTransaction(); + SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath); addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false); trx.commit(); } @@ -964,14 +973,14 @@ public class ZKRMStateStore extends RMStateStore { + rmDTIdentifier.getSequenceNumber()); } - safeDelete(nodeRemovePath); + zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath); } @Override protected synchronized void updateRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { - SafeTransaction trx = new SafeTransaction(); + SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath); String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); @@ -1035,8 +1044,8 @@ public class ZKRMStateStore extends RMStateStore { ByteArrayOutputStream os = new ByteArrayOutputStream(); try(DataOutputStream fsOut = new DataOutputStream(os)) { delegationKey.write(fsOut); - safeCreate(nodeCreatePath, os.toByteArray(), zkAcl, - CreateMode.PERSISTENT); + zkManager.safeCreate(nodeCreatePath, os.toByteArray(), zkAcl, + CreateMode.PERSISTENT, zkAcl, fencingNodePath); } } @@ -1051,7 +1060,7 @@ public class ZKRMStateStore extends RMStateStore { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } - safeDelete(nodeRemovePath); + zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath); } @Override @@ -1078,7 +1087,8 @@ public class ZKRMStateStore extends RMStateStore { AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); byte[] stateData = data.getProto().toByteArray(); - safeSetData(amrmTokenSecretManagerRoot, stateData, -1); + zkManager.safeSetData(amrmTokenSecretManagerRoot, stateData, -1, zkAcl, + fencingNodePath); } @Override @@ -1092,12 +1102,12 @@ public class ZKRMStateStore extends RMStateStore { + " for" + " plan " + planName); } - safeDelete(reservationPath); + zkManager.safeDelete(reservationPath, zkAcl, fencingNodePath); List<String> reservationNodes = getChildren(planNodePath); if (reservationNodes.isEmpty()) { - safeDelete(planNodePath); + zkManager.safeDelete(planNodePath, zkAcl, fencingNodePath); } } @@ -1105,7 +1115,7 @@ public class ZKRMStateStore extends RMStateStore { protected synchronized void storeReservationState( ReservationAllocationStateProto reservationAllocation, String planName, String reservationIdName) throws Exception { - SafeTransaction trx = new SafeTransaction(); + SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath); addOrUpdateReservationState(reservationAllocation, planName, reservationIdName, trx, false); trx.commit(); @@ -1191,7 +1201,8 @@ public class ZKRMStateStore extends RMStateStore { getNodePath(rootNode, nodeName.substring(0, splitIdx)); if (createParentIfNotExists && !exists(rootNodePath)) { try { - safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); } catch (KeeperException.NodeExistsException e) { if (LOG.isDebugEnabled()) { LOG.debug("Unable to create app parent node " + rootNodePath + @@ -1248,76 +1259,6 @@ public class ZKRMStateStore extends RMStateStore { zkManager.delete(path); } - private void safeCreate(String path, byte[] data, List<ACL> acl, - CreateMode mode) throws Exception { - if (!exists(path)) { - SafeTransaction transaction = new SafeTransaction(); - transaction.create(path, data, acl, mode); - transaction.commit(); - } - } - - /** - * Deletes the path. Checks for existence of path as well. - * @param path Path to be deleted. - * @throws Exception if any problem occurs while performing deletion. - */ - private void safeDelete(final String path) throws Exception { - if (exists(path)) { - SafeTransaction transaction = new SafeTransaction(); - transaction.delete(path); - transaction.commit(); - } - } - - private void safeSetData(String path, byte[] data, int version) - throws Exception { - SafeTransaction transaction = new SafeTransaction(); - transaction.setData(path, data, version); - transaction.commit(); - } - - /** - * Use curator transactions to ensure zk-operations are performed in an all - * or nothing fashion. This is equivalent to using ZooKeeper#multi. - * - * TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll - * have to rewrite this inner class when we adopt that. - */ - private class SafeTransaction { - private CuratorTransactionFinal transactionFinal; - - SafeTransaction() throws Exception { - CuratorFramework curatorFramework = zkManager.getCurator(); - CuratorTransaction transaction = curatorFramework.inTransaction(); - transactionFinal = transaction.create() - .withMode(CreateMode.PERSISTENT).withACL(zkAcl) - .forPath(fencingNodePath, new byte[0]).and(); - } - - public void commit() throws Exception { - transactionFinal = transactionFinal.delete() - .forPath(fencingNodePath).and(); - transactionFinal.commit(); - } - - public void create(String path, byte[] data, List<ACL> acl, CreateMode mode) - throws Exception { - transactionFinal = transactionFinal.create() - .withMode(mode).withACL(acl).forPath(path, data).and(); - } - - public void delete(String path) throws Exception { - transactionFinal = transactionFinal.delete().forPath(path).and(); - } - - public void setData(String path, byte[] data, int version) - throws Exception { - transactionFinal = transactionFinal.setData() - .withVersion(version).forPath(path, data).and(); - } - } - /** * Helper class that periodically attempts creating a znode to ensure that * this RM continues to be the Active. @@ -1332,7 +1273,7 @@ public class ZKRMStateStore extends RMStateStore { try { while (!isFencedState()) { // Create and delete fencing node - new SafeTransaction().commit(); + zkManager.createTransaction(zkAcl, fencingNodePath).commit(); Thread.sleep(zkSessionTimeout); } } catch (InterruptedException ie) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
