Author: ekoifman Date: Mon Oct 20 18:44:12 2014 New Revision: 1633197 URL: http://svn.apache.org/r1633197 Log: HIVE-8387 add retry logic to ZooKeeperStorage in WebHCat
Modified: hive/trunk/hcatalog/webhcat/svr/pom.xml hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java hive/trunk/pom.xml Modified: hive/trunk/hcatalog/webhcat/svr/pom.xml URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/pom.xml?rev=1633197&r1=1633196&r2=1633197&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/pom.xml (original) +++ hive/trunk/hcatalog/webhcat/svr/pom.xml Mon Oct 20 18:44:12 2014 @@ -38,7 +38,7 @@ </properties> <dependencies> - <!-- dependencies are always listed in sorted order by groupId, artifectId --> + <!-- dependencies are always listed in sorted order by groupId, artifactId --> <!-- intra-project --> <dependency> <groupId>org.apache.hive.hcatalog</groupId> @@ -72,6 +72,14 @@ <artifactId>commons-exec</artifactId> <version>${commons-exec.version}</version> </dependency> + + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator.version}</version> + </dependency> + <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> @@ -195,6 +203,37 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>include-curator</id> + <!-- WebHCat uses Curator library to work with ZooKeeper. Thus it must be available + on a random node in the cluster where LaunchMapper runs to actually execute the job. + The simplest way is to include it in webhcat jar which is shipped to target node since + it contains LaunchMapper.java.--> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <minimizeJar>true</minimizeJar> + <artifactSet> + <includes> + <include>org.apache.curator</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>org.apache.curator</pattern> + <shadedPattern>webhcat.org.apache.curator</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> </project> Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java?rev=1633197&r1=1633196&r2=1633197&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java Mon Oct 20 18:44:12 2014 @@ -25,9 +25,7 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -116,32 +114,6 @@ public class HDFSStorage implements Temp } @Override - public Map<String, String> getFields(Type type, String id) { - HashMap<String, String> map = new HashMap<String, String>(); - BufferedReader in = null; - Path p = new Path(getPath(type) + "/" + id); - try { - for (FileStatus status : fs.listStatus(p)) { - in = new BufferedReader(new InputStreamReader(fs.open(status.getPath()))); - String line = null; - String val = ""; - while ((line = in.readLine()) != null) { - if (!val.equals("")) { - val += "\n"; - } - val += line; - } - map.put(status.getPath().getName(), val); - } - } catch (IOException e) { - LOG.trace("Couldn't find " + p); - } finally { - close(in); - } - return map; - } - - @Override public boolean delete(Type type, String id) throws NotFoundException { Path p = new Path(getPath(type) + "/" + id); try { @@ -153,14 +125,6 @@ public class HDFSStorage implements Temp return false; } - @Override - public List<String> getAll() { - ArrayList<String> allNodes = new ArrayList<String>(); - for (Type type : Type.values()) { - allNodes.addAll(getAllForType(type)); - } - return allNodes; - } @Override public List<String> getAllForType(Type type) { @@ -177,40 +141,6 @@ public class HDFSStorage implements Temp } @Override - public List<String> getAllForKey(String key, String value) { - ArrayList<String> allNodes = new ArrayList<String>(); - try { - for (Type type : Type.values()) { - allNodes.addAll(getAllForTypeAndKey(type, key, value)); - } - } catch (Exception e) { - LOG.trace("Couldn't find children for key " + key + ": " + - e.getMessage()); - } - return allNodes; - } - - @Override - public List<String> getAllForTypeAndKey(Type type, String key, String value) { - ArrayList<String> allNodes = new ArrayList<String>(); - HashMap<String, String> map = new HashMap<String, String>(); - try { - for (FileStatus status : - fs.listStatus(new Path(getPath(type)))) { - map = (HashMap<String, String>) - getFields(type, status.getPath().getName()); - if (map.get(key).equals(value)) { - allNodes.add(status.getPath().getName()); - } - } - } catch (Exception e) { - LOG.trace("Couldn't find children for key " + key + ": " + - e.getMessage()); - } - return allNodes; - } - - @Override public void openStorage(Configuration config) throws IOException { storage_root = config.get(TempletonStorage.STORAGE_ROOT); if (fs == null) { Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java?rev=1633197&r1=1633196&r2=1633197&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java Mon Oct 20 18:44:12 2014 @@ -24,19 +24,30 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.conf.Configuration; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; +/* + * The general idea here is to create + * /created/1 + * /created/2 + * /created/3 .... + * for each job submitted. The node number is generated by ZK (PERSISTENT_SEQUENTIAL) and the + * payload is the JobId. Basically this keeps track of the order in which jobs were submitted, + * and ZooKeeperCleanup uses this to purge old job info. + * Since the /jobs/<id> node has a create/update timestamp + * (http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#sc_zkStatStructure) this whole + * thing can be removed. +*/ public class JobStateTracker { // The path to the tracking root private String job_trackingroot = null; // The zookeeper connection to use - private ZooKeeper zk; + private CuratorFramework zk; // The id of the tracking node -- must be a SEQUENTIAL node private String trackingnode; @@ -51,7 +62,7 @@ public class JobStateTracker { * Constructor for a new node -- takes the jobid of an existing job * */ - public JobStateTracker(String node, ZooKeeper zk, boolean nodeIsTracker, + public JobStateTracker(String node, CuratorFramework zk, boolean nodeIsTracker, String job_trackingpath) { this.zk = zk; if (nodeIsTracker) { @@ -65,30 +76,25 @@ public class JobStateTracker { /** * Create the parent znode for this job state. */ - public void create() - throws IOException { - String[] paths = ZooKeeperStorage.getPaths(job_trackingroot); - for (String znode : paths) { - try { - zk.create(znode, new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException e) { - } catch (Exception e) { - throw new IOException("Unable to create parent nodes"); - } + public void create() throws IOException { + try { + zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .withACL(Ids.OPEN_ACL_UNSAFE).forPath(job_trackingroot); + } catch (KeeperException.NodeExistsException e) { + //root must exist already + } catch (Exception e) { + throw new IOException("Unable to create parent nodes"); } try { - trackingnode = zk.create(makeTrackingZnode(), jobid.getBytes(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + trackingnode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL) + .withACL(Ids.OPEN_ACL_UNSAFE).forPath(makeTrackingZnode(), jobid.getBytes()); } catch (Exception e) { throw new IOException("Unable to create " + makeTrackingZnode()); } } - - public void delete() - throws IOException { + public void delete() throws IOException { try { - zk.delete(makeTrackingJobZnode(trackingnode), -1); + zk.delete().forPath(makeTrackingJobZnode(trackingnode)); } catch (Exception e) { // Might have been deleted already LOG.info("Couldn't delete " + makeTrackingJobZnode(trackingnode)); @@ -101,13 +107,10 @@ public class JobStateTracker { */ public String getJobID() throws IOException { try { - return new String(zk.getData(makeTrackingJobZnode(trackingnode), - false, new Stat())); - } catch (KeeperException e) { + return new String(zk.getData().forPath(makeTrackingJobZnode(trackingnode))); + } catch (Exception e) { // It was deleted during the transaction - throw new IOException("Node already deleted " + trackingnode); - } catch (InterruptedException e) { - throw new IOException("Couldn't read node " + trackingnode); + throw new IOException("Node already deleted " + trackingnode, e); } } @@ -129,13 +132,13 @@ public class JobStateTracker { * Get the list of tracking jobs. These can be used to determine which jobs have * expired. */ - public static List<String> getTrackingJobs(Configuration conf, ZooKeeper zk) + public static List<String> getTrackingJobs(Configuration conf, CuratorFramework zk) throws IOException { ArrayList<String> jobs = new ArrayList<String>(); try { - for (String myid : zk.getChildren( + for (String myid : zk.getChildren().forPath( conf.get(TempletonStorage.STORAGE_ROOT) - + ZooKeeperStorage.TRACKINGDIR, false)) { + + ZooKeeperStorage.TRACKINGDIR)) { jobs.add(myid); } } catch (Exception e) { Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java?rev=1633197&r1=1633196&r2=1633197&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java Mon Oct 20 18:44:12 2014 @@ -20,7 +20,6 @@ package org.apache.hive.hcatalog.templet import java.io.IOException; import java.util.List; -import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -45,7 +44,7 @@ import org.apache.hadoop.conf.Configurat public interface TempletonStorage { // These are the possible types referenced by 'type' below. public enum Type { - UNKNOWN, JOB, JOBTRACKING, TEMPLETONOVERHEAD + UNKNOWN, JOB, JOBTRACKING } public static final String STORAGE_CLASS = "templeton.storage.class"; @@ -79,20 +78,6 @@ public interface TempletonStorage { public String getField(Type type, String id, String key); /** - * Get all the name/value pairs stored for this id. - * Be careful using getFields() -- optimistic locking will mean that - * your odds of a conflict are decreased if you read/write one field - * at a time. getFields() is intended for read-only usage. - * - * If the type is UNKNOWN, search for the id in all types. - * - * @param type The data type (as listed above) - * @param id The String id of this data grouping (jobid, etc.) - * @return A Map of key/value pairs found for this type/id. - */ - public Map<String, String> getFields(Type type, String id); - - /** * Delete a data grouping (all data for a jobid, all tracking data * for a job, etc.). If the type is UNKNOWN, search for the id * in all types. @@ -105,13 +90,6 @@ public interface TempletonStorage { public boolean delete(Type type, String id) throws NotFoundException; /** - * Get the id of each data grouping in the storage system. - * - * @return An ArrayList<String> of ids. - */ - public List<String> getAll(); - - /** * Get the id of each data grouping of a given type in the storage * system. * @param type The data type (as listed above) @@ -120,26 +98,6 @@ public interface TempletonStorage { public List<String> getAllForType(Type type); /** - * Get the id of each data grouping that has the specific key/value - * pair. - * @param key The name of the field to search for - * @param value The value of the field to search for - * @return An ArrayList<String> of ids. - */ - public List<String> getAllForKey(String key, String value); - - /** - * Get the id of each data grouping of a given type that has the - * specific key/value pair. - * @param type The data type (as listed above) - * @param key The name of the field to search for - * @param value The value of the field to search for - * @return An ArrayList<String> of ids. - */ - public List<String> getAllForTypeAndKey(Type type, String key, - String value); - - /** * For storage methods that require a connection, this is a hint * that it's time to open a connection. */ Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java?rev=1633197&r1=1633196&r2=1633197&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java Mon Oct 20 18:44:12 2014 @@ -24,8 +24,8 @@ import java.util.Collections; import java.util.List; import java.util.Date; +import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.conf.Configuration; -import org.apache.zookeeper.ZooKeeper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -89,7 +89,7 @@ public class ZooKeeperCleanup extends Th * @throws IOException */ public void run() { - ZooKeeper zk = null; + CuratorFramework zk = null; List<String> nodes = null; isRunning = true; while (!stop) { @@ -112,13 +112,7 @@ public class ZooKeeperCleanup extends Th } catch (Exception e) { LOG.error("Cleanup cycle failed: " + e.getMessage()); } finally { - if (zk != null) { - try { - zk.close(); - } catch (InterruptedException e) { - // We're trying to exit anyway, just ignore. - } - } + if (zk != null) zk.close(); } long sleepMillis = (long) (Math.random() * interval); @@ -140,7 +134,7 @@ public class ZooKeeperCleanup extends Th * * @throws IOException */ - public List<String> getChildList(ZooKeeper zk) { + public List<String> getChildList(CuratorFramework zk) { try { List<String> jobs = JobStateTracker.getTrackingJobs(appConf, zk); Collections.sort(jobs); @@ -154,7 +148,7 @@ public class ZooKeeperCleanup extends Th /** * Check to see if a job is more than maxage old, and delete it if so. */ - public boolean checkAndDelete(String node, ZooKeeper zk) { + public boolean checkAndDelete(String node, CuratorFramework zk) { JobState state = null; try { JobStateTracker tracker = new JobStateTracker(node, zk, true, @@ -167,8 +161,11 @@ public class ZooKeeperCleanup extends Th // an error in creation, and we want to delete it anyway. long then = 0; if (state.getCreated() != null) { + //this is set in ZooKeeperStorage.create() then = state.getCreated(); } + //todo: this should check that the job actually completed and likely use completion time + //which is not tracked directly but available on /jobs/<id> node via "mtime" in Stat if (now - then > maxage) { LOG.info("Deleting " + tracker.getJobID()); state.delete(); @@ -177,7 +174,7 @@ public class ZooKeeperCleanup extends Th } return false; } catch (Exception e) { - LOG.info("checkAndDelete failed for " + node); + LOG.info("checkAndDelete failed for " + node + " due to: " + e.getMessage()); // We don't throw a new exception for this -- just keep going with the // next one. return true; Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java?rev=1633197&r1=1633196&r2=1633197&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java Mon Oct 20 18:44:12 2014 @@ -19,21 +19,18 @@ package org.apache.hive.hcatalog.templeton.tool; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.conf.Configuration; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; /** * A storage implementation based on storing everything in ZooKeeper. @@ -60,29 +57,29 @@ public class ZooKeeperStorage implements private static final Log LOG = LogFactory.getLog(ZooKeeperStorage.class); - private ZooKeeper zk; + private CuratorFramework zk; /** * Open a ZooKeeper connection for the JobState. */ - public static ZooKeeper zkOpen(String zkHosts, int zkSessionTimeout) + public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs) throws IOException { - return new ZooKeeper(zkHosts, - zkSessionTimeout, - new Watcher() { - @Override - synchronized public void process(WatchedEvent event) { - } - }); + //do we need to add a connection status listener? What will that do? + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); + CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs, + CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy); + zk.start(); + return zk; } /** * Open a ZooKeeper connection for the JobState. */ - public static ZooKeeper zkOpen(Configuration conf) - throws IOException { + public static CuratorFramework zkOpen(Configuration conf) throws IOException { + /*the silly looking call to Builder below is to get the default value of session timeout + from Curator which itself exposes it as system property*/ return zkOpen(conf.get(ZK_HOSTS), - conf.getInt(ZK_SESSION_TIMEOUT, 30000)); + conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs())); } public ZooKeeperStorage() { @@ -93,15 +90,9 @@ public class ZooKeeperStorage implements /** * Close this ZK connection. */ - public void close() - throws IOException { + public void close() throws IOException { if (zk != null) { - try { - zk.close(); - zk = null; - } catch (InterruptedException e) { - throw new IOException("Closing ZooKeeper connection", e); - } + zk.close(); } } @@ -118,48 +109,54 @@ public class ZooKeeperStorage implements */ public void create(Type type, String id) throws IOException { + boolean wasCreated = false; try { - String[] paths = getPaths(makeZnode(type, id)); - boolean wasCreated = false; - for (String znode : paths) { - try { - zk.create(znode, new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - wasCreated = true; - } catch (KeeperException.NodeExistsException e) { + zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(makeZnode(type, id)); + wasCreated = true; + } + catch(KeeperException.NodeExistsException ex) { + //we just created top level node for this jobId + } + catch(Exception ex) { + throw new IOException("Error creating " + makeZnode(type, id), ex); + } + if(wasCreated) { + try { + // Really not sure if this should go here. Will have + // to see how the storage mechanism evolves. + if (type.equals(Type.JOB)) { + JobStateTracker jt = new JobStateTracker(id, zk, false, job_trackingpath); + jt.create(); } - } - if (wasCreated) { + } catch (Exception e) { + LOG.error("Error tracking (jobId=" + id + "): " + e.getMessage()); + // If we couldn't create the tracker node, don't create the main node. try { - // Really not sure if this should go here. Will have - // to see how the storage mechanism evolves. - if (type.equals(Type.JOB)) { - JobStateTracker jt = new JobStateTracker(id, zk, false, - job_trackingpath); - jt.create(); - } - } catch (Exception e) { - LOG.warn("Error tracking: " + e.getMessage()); - // If we couldn't create the tracker node, don't - // create the main node. - zk.delete(makeZnode(type, id), -1); + zk.delete().forPath(makeZnode(type, id));//default version is -1 + } + catch(Exception ex) { + //EK: it's not obvious that this is the right logic, if we don't record the 'callback' + //for example and never notify the client of job completion + throw new IOException("Failed to delete " + makeZnode(type, id) + ":" + ex); } } - if (zk.exists(makeZnode(type, id), false) == null) + } + try { + if (zk.checkExists().forPath(makeZnode(type, id)) == null) { throw new IOException("Unable to create " + makeZnode(type, id)); - if (wasCreated) { - try { - saveField(type, id, "created", - Long.toString(System.currentTimeMillis())); - } catch (NotFoundException nfe) { - // Wow, something's really wrong. - throw new IOException("Couldn't write to node " + id, nfe); - } } - } catch (KeeperException e) { - throw new IOException("Creating " + id, e); - } catch (InterruptedException e) { - throw new IOException("Creating " + id, e); + } + catch (Exception ex) { + throw new IOException(ex); + } + if (wasCreated) { + try { + saveField(type, id, "created", + Long.toString(System.currentTimeMillis())); + } catch (NotFoundException nfe) { + // Wow, something's really wrong. + throw new IOException("Couldn't write to node " + id, nfe); + } } } @@ -198,25 +195,14 @@ public class ZooKeeperStorage implements /** * A helper method that sets a field value. - * @param type - * @param id - * @param name - * @param val - * @throws KeeperException - * @throws UnsupportedEncodingException - * @throws InterruptedException + * @throws java.lang.Exception */ - private void setFieldData(Type type, String id, String name, String val) - throws KeeperException, UnsupportedEncodingException, InterruptedException { + private void setFieldData(Type type, String id, String name, String val) throws Exception { try { - zk.create(makeFieldZnode(type, id, name), - val.getBytes(ENCODING), - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + zk.create().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE) + .forPath(makeFieldZnode(type, id, name), val.getBytes(ENCODING)); } catch (KeeperException.NodeExistsException e) { - zk.setData(makeFieldZnode(type, id, name), - val.getBytes(ENCODING), - -1); + zk.setData().forPath(makeFieldZnode(type, id, name), val.getBytes(ENCODING)); } } @@ -251,7 +237,7 @@ public class ZooKeeperStorage implements @Override public String getField(Type type, String id, String key) { try { - byte[] b = zk.getData(makeFieldZnode(type, id, key), false, null); + byte[] b = zk.getData().forPath(makeFieldZnode(type, id, key)); return new String(b, ENCODING); } catch (Exception e) { return null; @@ -259,26 +245,12 @@ public class ZooKeeperStorage implements } @Override - public Map<String, String> getFields(Type type, String id) { - HashMap<String, String> map = new HashMap<String, String>(); - try { - for (String node : zk.getChildren(makeZnode(type, id), false)) { - byte[] b = zk.getData(makeFieldZnode(type, id, node), - false, null); - map.put(node, new String(b, ENCODING)); - } - } catch (Exception e) { - return map; - } - return map; - } - - @Override public boolean delete(Type type, String id) throws NotFoundException { try { - for (String child : zk.getChildren(makeZnode(type, id), false)) { + + for (String child : zk.getChildren().forPath(makeZnode(type, id))) { try { - zk.delete(makeFieldZnode(type, id, child), -1); + zk.delete().forPath(makeFieldZnode(type, id, child)); } catch (Exception e) { // Other nodes may be trying to delete this at the same time, // so just log errors and skip them. @@ -287,7 +259,7 @@ public class ZooKeeperStorage implements } } try { - zk.delete(makeZnode(type, id), -1); + zk.delete().forPath(makeZnode(type, id)); } catch (Exception e) { // Same thing -- might be deleted by other nodes, so just go on. throw new NotFoundException("Couldn't delete " + @@ -302,58 +274,15 @@ public class ZooKeeperStorage implements } @Override - public List<String> getAll() { - ArrayList<String> allNodes = new ArrayList<String>(); - for (Type type : Type.values()) { - allNodes.addAll(getAllForType(type)); - } - return allNodes; - } - - @Override public List<String> getAllForType(Type type) { try { - return zk.getChildren(getPath(type), false); + return zk.getChildren().forPath(getPath(type)); } catch (Exception e) { return new ArrayList<String>(); } } @Override - public List<String> getAllForKey(String key, String value) { - ArrayList<String> allNodes = new ArrayList<String>(); - try { - for (Type type : Type.values()) { - allNodes.addAll(getAllForTypeAndKey(type, key, value)); - } - } catch (Exception e) { - LOG.info("Couldn't find children."); - } - return allNodes; - } - - @Override - public List<String> getAllForTypeAndKey(Type type, String key, String value) { - ArrayList<String> allNodes = new ArrayList<String>(); - try { - for (String id : zk.getChildren(getPath(type), false)) { - for (String field : zk.getChildren(id, false)) { - if (field.endsWith("/" + key)) { - byte[] b = zk.getData(field, false, null); - if (new String(b, ENCODING).equals(value)) { - allNodes.add(id); - } - } - } - } - } catch (Exception e) { - // Log and go to the next type -- this one might not exist - LOG.info("Couldn't find children of " + getPath(type)); - } - return allNodes; - } - - @Override public void openStorage(Configuration config) throws IOException { storage_root = config.get(STORAGE_ROOT); job_path = storage_root + "/jobs"; Modified: hive/trunk/pom.xml URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1633197&r1=1633196&r2=1633197&view=diff ============================================================================== --- hive/trunk/pom.xml (original) +++ hive/trunk/pom.xml Mon Oct 20 18:44:12 2014 @@ -161,6 +161,7 @@ <zookeeper.version>3.4.5</zookeeper.version> <jpam.version>1.1</jpam.version> <felix.version>2.4.0</felix.version> + <curator.version>2.5.0</curator.version> </properties> <repositories> @@ -472,7 +473,13 @@ </exclusion> </exclusions> </dependency> - <dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator.version}</version> + </dependency> + + <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> <version>${groovy.version}</version>