Repository: airavata Updated Branches: refs/heads/master 2207eceab -> d1d8759fd
Optimized zookeeper job count update process to update job addition and job deletion steps Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9c23aa81 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9c23aa81 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9c23aa81 Branch: refs/heads/master Commit: 9c23aa81147268b70472df3001bd04575a457e9e Parents: e45607a Author: shamrath <[email protected]> Authored: Fri Sep 12 17:24:59 2014 -0400 Committer: shamrath <[email protected]> Committed: Fri Sep 12 17:24:59 2014 -0400 ---------------------------------------------------------------------- .../handlers/GridPullMonitorHandler.java | 1 + .../monitor/impl/pull/qstat/HPCPullMonitor.java | 160 +++---------------- .../airavata/gfac/monitor/util/CommonUtils.java | 122 ++++++++++++++ 3 files changed, 149 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/9c23aa81/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index 9ca5235..ff467bf 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -100,6 +100,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ e.printStackTrace(); } CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID); + CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper } catch (AiravataMonitorException e) { logger.error("Error adding monitorID object to the queue with experiment ", monitorID.getExperimentID()); } http://git-wip-us.apache.org/repos/asf/airavata/blob/9c23aa81/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index b4c5819..dac9499 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -20,22 +20,7 @@ */ package org.apache.airavata.gfac.monitor.impl.pull.qstat; -import java.io.IOException; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.AiravataZKUtils; -import org.apache.airavata.common.utils.Constants; +import com.google.common.eventbus.EventBus; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.HostDescription; @@ -57,16 +42,20 @@ import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.airavata.schemas.gfac.SSHHostType; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.eventbus.EventBus; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; /** * This monitor is based on qstat command which can be run @@ -275,10 +264,24 @@ public class HPCPullMonitor extends PullMonitor { queue.put(take); // cleaning up the completed jobs, this method will remove some of the userMonitorData from the queue if // they become empty + Map<String, Integer> jobRemoveCountMap = new HashMap<String, Integer>(); + ZooKeeper zk = null; for (MonitorID completedJob : completedJobs) { CommonUtils.removeMonitorFromQueue(queue, completedJob); + if (zk == null) { + zk = completedJob.getJobExecutionContext().getZk(); + } + String key = CommonUtils.getJobCountUpdatePath(completedJob); + int i = 0; + if (jobRemoveCountMap.containsKey(key)) { + i = Integer.valueOf(jobRemoveCountMap.get(key)); + } + jobRemoveCountMap.put(key, ++i); + } + if (completedJobs.size() > 0) { + // reduce completed job count from zookeeper + CommonUtils.updateZkWithJobCount(zk, jobRemoveCountMap, false); } -// updateZkWithJobCount(take , completedJobs); } catch (InterruptedException e) { if (!this.queue.contains(take)) { try { @@ -342,117 +345,6 @@ public class HPCPullMonitor extends PullMonitor { } /** - * Build the /stat/{username}/{hostAddress}/job znode path and store job count - * - * @param userMonitorData - * @param completedJobs - * @throws ApplicationSettingsException - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - */ - private void updateZkWithJobCount(UserMonitorData userMonitorData, List<MonitorID> completedJobs) { - try { - final CountDownLatch latch = new CountDownLatch(1); - ZooKeeper zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher() { - @Override - public void process(WatchedEvent event) { - if (event.getState() == Event.KeeperState.SyncConnected) { - latch.countDown(); - } - } - }); - latch.await(); - - try { - List<String> updatedPathList = new ArrayList<String>(); - String pathToUserName = new StringBuilder("/").append(Constants.STAT) - .append("/").append(userMonitorData.getUserName()).toString(); - StringBuilder jobPathBuilder; - for (HostMonitorData hostData : userMonitorData.getHostMonitorData()) { - jobPathBuilder = new StringBuilder(pathToUserName).append("/") - .append(hostData.getHost().getType().getHostAddress()).append("/").append(Constants.JOB); - checkAndCreateZNode(zk, jobPathBuilder.toString()); - int jobCount = 0; - String jobCountStr = new String(zk.getData(jobPathBuilder.toString(), null, null)); - try { - jobCount = Integer.parseInt(jobCountStr); - } catch (NumberFormatException e) { - // do nothing , keep jobCount 0 - } - List<MonitorID> idList = hostData.getMonitorIDs(); - boolean updatePath = true; - if (idList != null) { - if (jobCount == idList.size()) { - updatePath = false; - } else { - jobCount = idList.size(); - } - // removed already updated jobs from complete jobs - for (MonitorID monitorID : idList) { - if (completedJobs.contains(monitorID)) { - completedJobs.remove(monitorID); - } - } - } - if (updatePath) { - zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(), -1); - updatedPathList.add(jobPathBuilder.toString()); - } - } - - //handle completed jobs - /* If all jobs are completed in a host then monitor queue remove such hosts from monitoring ,but we need - to update those host's stat with JobCount 0 */ - for (MonitorID monitorID : completedJobs) { - jobPathBuilder = new StringBuilder(pathToUserName).append("/") - .append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB); - zk.setData(jobPathBuilder.toString(), "0".getBytes(), -1); - updatedPathList.add(jobPathBuilder.toString()); - } - // trigger orchestrator watcher by saving the updated list to zookeeper - if (updatedPathList.size() > 0) { - StringBuilder strBuilder = new StringBuilder(); - for (String updatedPath : updatedPathList) { - strBuilder.append(updatedPath).append(":"); - } - strBuilder.deleteCharAt(strBuilder.length() - 1); - zk.setData(("/" + Constants.STAT), strBuilder.toString().getBytes(), -1); - } - zk.close(); - } catch (KeeperException e) { - logger.error("Error while storing job count to zookeeper", e); - } catch (InterruptedException e) { - logger.error("Error while storing job count to zookeeper", e); - } - } catch (IOException e) { - logger.error("Error while connecting to the zookeeper server", e); - } catch (ApplicationSettingsException e) { - logger.error("Error while getting zookeeper hostport property", e); - } catch (InterruptedException e) { - logger.error("Error while waiting for SyncConnected message", e); - } - - } - - /** - * Check whether znode is exist in given path if not create a new znode - * @param zk - zookeeper instance - * @param path - path to check znode - * @throws KeeperException - * @throws InterruptedException - */ - private void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException, InterruptedException { - if (zk.exists(path, null) == null) { // if znode doesn't exist - if (path.lastIndexOf("/") > 1) { // recursively traverse to parent znode and check parent exist - checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/")))); - } - zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode - } - } - - - /** * This is the method to stop the polling process * * @return if the stopping process is successful return true else false http://git-wip-us.apache.org/repos/asf/airavata/blob/9c23aa81/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java index 6db4550..9cb544c 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -20,6 +20,9 @@ */ package org.apache.airavata.gfac.monitor.util; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataZKUtils; +import org.apache.airavata.common.utils.Constants; import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; @@ -30,12 +33,22 @@ import org.apache.airavata.gfac.monitor.HostMonitorData; import org.apache.airavata.gfac.monitor.UserMonitorData; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; import org.apache.airavata.schemas.gfac.GsisshHostType; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; public class CommonUtils { private final static Logger logger = LoggerFactory.getLogger(CommonUtils.class); @@ -204,4 +217,113 @@ public class CommonUtils { } } } + + /** + * Update job count for a given set of paths. + * @param zk - zookeeper instance + * @param changeCountMap - map of change job count with relevant path + * @param isAdd - Should add or reduce existing job count by the given job count. + */ + public static void updateZkWithJobCount(ZooKeeper zk, final Map<String, Integer> changeCountMap, boolean isAdd) { + StringBuilder changeZNodePaths = new StringBuilder(); + try { + if (zk == null || !zk.getState().isConnected()) { + try { + final CountDownLatch countDownLatch = new CountDownLatch(1); + zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher() { + @Override + public void process(WatchedEvent event) { + countDownLatch.countDown(); + } + }); + countDownLatch.await(); + } catch (ApplicationSettingsException e) { + logger.error("Error while reading zookeeper hostport string"); + } catch (IOException e) { + logger.error("Error while reconnect attempt to zookeeper where zookeeper connection loss state"); + } + } + + for (String path : changeCountMap.keySet()) { + if (isAdd) { + CommonUtils.checkAndCreateZNode(zk, path); + } + byte[] byteData = zk.getData(path, null, null); + String nodeData; + if (byteData == null) { + if (isAdd) { + zk.setData(path, String.valueOf(changeCountMap.get(path)).getBytes(), -1); + } else { + // This is not possible, but we handle in case there any data zookeeper communication failure + logger.warn("Couldn't reduce job count in " + path + " as it returns null data. Hence reset the job count to 0"); + zk.setData(path, "0".getBytes(), -1); + } + } else { + nodeData = new String(byteData); + if (isAdd) { + zk.setData(path, String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes(), -1); + } else { + int previousCount = Integer.parseInt(nodeData); + int removeCount = changeCountMap.get(path); + if (previousCount >= removeCount) { + zk.setData(path, String.valueOf(previousCount - removeCount).getBytes(), -1); + } else { + // This is not possible, do we need to reset the job count to 0 ? + logger.error("Requested remove job count is " + removeCount + + " which is higher than the existing job count " + previousCount + + " in " + path + " path."); + } + } + } + changeZNodePaths.append(path).append(":"); + } + + // update stat node to trigger orchestrator watchers + if (changeCountMap.size() > 0) { + changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1); + zk.setData("/" + Constants.STAT, changeZNodePaths.toString().getBytes(), -1); + } + } catch (KeeperException e) { + logger.error("Error while writing job count to zookeeper", e); + } catch (InterruptedException e) { + logger.error("Error while writing job count to zookeeper", e); + } + + } + + /** + * Increase job count by one and update the zookeeper + * @param monitorID - Job monitorId + */ + public static void increaseZkJobCount(MonitorID monitorID) { + Map<String, Integer> addMap = new HashMap<String, Integer>(); + addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1); + updateZkWithJobCount(monitorID.getJobExecutionContext().getZk(), addMap, true); + } + + /** + * Construct and return the path for a given MonitorID , eg: /stat/{username}/{resourceName}/job + * @param monitorID - Job monitorId + * @return + */ + public static String getJobCountUpdatePath(MonitorID monitorID){ + return new StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName()) + .append("/").append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB).toString(); + } + + /** + * Check whether znode is exist in given path if not create a new znode + * @param zk - zookeeper instance + * @param path - path to check znode + * @throws KeeperException + * @throws InterruptedException + */ + private static void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException, InterruptedException { + if (zk.exists(path, null) == null) { // if znode doesn't exist + if (path.lastIndexOf("/") > 1) { // recursively traverse to parent znode and check parent exist + checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/")))); + } + zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode + } + } }
