Repository: airavata Updated Branches: refs/heads/master 68e81ef80 -> 45f0d68fd
GFac pull monitor Write job count to zookeeper Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/78ad2ef5 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/78ad2ef5 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/78ad2ef5 Branch: refs/heads/master Commit: 78ad2ef58b04051bc5c41791b7c03f90da914e5c Parents: 92ad9f1 Author: shamrath <[email protected]> Authored: Thu Sep 11 21:35:41 2014 -0400 Committer: shamrath <[email protected]> Committed: Thu Sep 11 21:35:41 2014 -0400 ---------------------------------------------------------------------- .../apache/airavata/common/utils/Constants.java | 2 + .../monitor/impl/pull/qstat/HPCPullMonitor.java | 109 ++++++++++++++++++- 2 files changed, 110 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/78ad2ef5/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java index b8f999a..8335e0c 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java @@ -46,4 +46,6 @@ public final class Constants { public static final String ZOOKEEPER_GFAC_SERVER_NAME = "gfac-server-name"; public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NAME = "orchestrator-server-name"; public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name"; + public static final String STAT = "stat"; + public static final String JOB = "job"; } http://git-wip-us.apache.org/repos/asf/airavata/blob/78ad2ef5/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 93e1aa9..3742179 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -20,6 +20,7 @@ */ package org.apache.airavata.gfac.monitor.impl.pull.qstat; +import java.io.IOException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Date; @@ -28,9 +29,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataZKUtils; +import org.apache.airavata.common.utils.Constants; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.HostDescription; @@ -48,10 +53,15 @@ import org.apache.airavata.gfac.monitor.util.CommonUtils; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; import org.apache.airavata.model.workspace.experiment.JobState; -import org.apache.airavata.model.workspace.experiment.JobStatus; import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.airavata.schemas.gfac.SSHHostType; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -242,6 +252,7 @@ public class HPCPullMonitor extends PullMonitor { for (MonitorID completedJob : completedJobs) { CommonUtils.removeMonitorFromQueue(queue, completedJob); } + updateZkWithJobCount(take , completedJobs); } catch (InterruptedException e) { if (!this.queue.contains(take)) { try { @@ -304,6 +315,102 @@ public class HPCPullMonitor extends PullMonitor { return true; } + /** + * Build the /stat/{username}/{hostAddress}/job znode path and store job count + * @param userMonitorData + * @param completedJobs + * @throws ApplicationSettingsException + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + private void updateZkWithJobCount(UserMonitorData userMonitorData, List<MonitorID> completedJobs) { + try { + final CountDownLatch latch = new CountDownLatch(1); + ZooKeeper zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher() { + @Override + public void process(WatchedEvent event) { + if (event.getState() == Event.KeeperState.SyncConnected) { + latch.countDown(); + } + } + }); + latch.await(); + + try { + List<String> updatedPathList = new ArrayList<String>(); + String pathToUserName = new StringBuilder("/").append(Constants.STAT) + .append("/").append(userMonitorData.getUserName()).toString(); + StringBuilder jobPathBuilder; + for (HostMonitorData hostData : userMonitorData.getHostMonitorData()) { + jobPathBuilder = new StringBuilder(pathToUserName).append("/") + .append(hostData.getHost().getType().getHostAddress()).append("/").append(Constants.JOB); + checkAndCreateZNode(zk, jobPathBuilder.toString()); + int jobCount = 0; + List<MonitorID> idList = hostData.getMonitorIDs(); + if (idList != null) { + jobCount = idList.size(); + // removed already updated jobs from complete jobs + for (MonitorID monitorID : idList) { + if (completedJobs.contains(monitorID)) { + completedJobs.remove(monitorID); + } + } + } + zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(), -1); + updatedPathList.add(jobPathBuilder.toString()); + } + + //handle completed jobs + /* If all jobs are completed in a host then monitor queue remove such hosts from monitoring ,but we need + to update those host's stat with JobCount 0 */ + for (MonitorID monitorID : completedJobs) { + jobPathBuilder = new StringBuilder(pathToUserName).append("/") + .append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB); + zk.setData(jobPathBuilder.toString(), "0".getBytes(), -1); + updatedPathList.add(jobPathBuilder.toString()); + } + // trigger orchestrator watcher by saving the updated list to zookeeper + if (updatedPathList.size() > 0) { + StringBuilder strBuilder = new StringBuilder(); + for (String updatedPath : updatedPathList) { + strBuilder.append(updatedPath).append(":"); + } + strBuilder.deleteCharAt(strBuilder.length() - 1); + zk.setData(("/" + Constants.STAT), strBuilder.toString().getBytes(), -1); + } + zk.close(); + } catch (KeeperException e) { + logger.error("Error while storing job count to zookeeper", e); + } catch (InterruptedException e) { + logger.error("Error while storing job count to zookeeper", e); + } + } catch (IOException e) { + logger.error("Error while connecting to the zookeeper server", e); + } catch (ApplicationSettingsException e) { + logger.error("Error while getting zookeeper hostport property", e); + } catch (InterruptedException e) { + logger.error("Error while waiting for SyncConnected message" , e); + } + + } + + /** + * Check whether znode is exist in given path if not create a new znode + * @param zk - zookeeper instance + * @param path - path to check znode + * @throws KeeperException + * @throws InterruptedException + */ + private void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException, InterruptedException { + if (zk.exists(path, null) == null) { // if znode doesn't exist + if (path.lastIndexOf("/") > 1) { // recursively traverse to parent znode and check parent exist + checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/")))); + } + zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode + } + } + /** * This is the method to stop the polling process
