http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspService.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspService.java new file mode 100644 index 0000000..b6e8f0e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/BspService.java @@ -0,0 +1,1069 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import org.apache.giraph.bsp.CentralizedService; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.partition.GraphPartitionerFactory; +import org.apache.giraph.zk.BspEvent; +import org.apache.giraph.zk.PredicateLock; +import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.giraph.zk.ZooKeeperManager; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs.Ids; +import org.json.JSONException; +import org.json.JSONObject; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Zookeeper-based implementation of {@link CentralizedService}. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + */ +@SuppressWarnings("rawtypes") +public abstract class BspService<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + implements Watcher, CentralizedService<I, V, E, M> { + /** Unset superstep */ + public static final long UNSET_SUPERSTEP = Long.MIN_VALUE; + /** Input superstep (superstep when loading the vertices happens) */ + public static final long INPUT_SUPERSTEP = -1; + /** Unset application attempt */ + public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE; + /** Base ZooKeeper directory */ + public static final String BASE_DIR = "/_hadoopBsp"; + /** Master job state znode above base dir */ + public static final String MASTER_JOB_STATE_NODE = "/_masterJobState"; + /** Vertex input split directory about base dir */ + public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir"; + /** Vertex input split done directory about base dir */ + public static final String VERTEX_INPUT_SPLIT_DONE_DIR = + "/_vertexInputSplitDoneDir"; + /** Denotes a reserved vertex input split */ + public static final String VERTEX_INPUT_SPLIT_RESERVED_NODE = + "/_vertexInputSplitReserved"; + /** Denotes a finished vertex input split */ + public static final String VERTEX_INPUT_SPLIT_FINISHED_NODE = + "/_vertexInputSplitFinished"; + /** Denotes that all the vertex input splits are are ready for consumption */ + public static final String VERTEX_INPUT_SPLITS_ALL_READY_NODE = + "/_vertexInputSplitsAllReady"; + /** Denotes that all the vertex input splits are done. */ + public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE = + "/_vertexInputSplitsAllDone"; + /** Edge input split directory about base dir */ + public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir"; + /** Edge input split done directory about base dir */ + public static final String EDGE_INPUT_SPLIT_DONE_DIR = + "/_edgeInputSplitDoneDir"; + /** Denotes a reserved edge input split */ + public static final String EDGE_INPUT_SPLIT_RESERVED_NODE = + "/_edgeInputSplitReserved"; + /** Denotes a finished edge input split */ + public static final String EDGE_INPUT_SPLIT_FINISHED_NODE = + "/_edgeInputSplitFinished"; + /** Denotes that all the edge input splits are are ready for consumption */ + public static final String EDGE_INPUT_SPLITS_ALL_READY_NODE = + "/_edgeInputSplitsAllReady"; + /** Denotes that all the edge input splits are done. */ + public static final String EDGE_INPUT_SPLITS_ALL_DONE_NODE = + "/_edgeInputSplitsAllDone"; + /** Directory of attempts of this application */ + public static final String APPLICATION_ATTEMPTS_DIR = + "/_applicationAttemptsDir"; + /** Where the master election happens */ + public static final String MASTER_ELECTION_DIR = "/_masterElectionDir"; + /** Superstep scope */ + public static final String SUPERSTEP_DIR = "/_superstepDir"; + /** Where the merged aggregators are located */ + public static final String MERGED_AGGREGATOR_DIR = + "/_mergedAggregatorDir"; + /** Healthy workers register here. */ + public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir"; + /** Unhealthy workers register here. */ + public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir"; + /** Workers which wrote checkpoint notify here */ + public static final String WORKER_WROTE_CHECKPOINT_DIR = + "/_workerWroteCheckpointDir"; + /** Finished workers notify here */ + public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir"; + /** Where the master and worker addresses and partition assignments are set */ + public static final String ADDRESSES_AND_PARTITIONS_DIR = + "/_addressesAndPartitions"; + /** Helps coordinate the partition exchnages */ + public static final String PARTITION_EXCHANGE_DIR = + "/_partitionExchangeDir"; + /** Denotes that the superstep is done */ + public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished"; + /** Denotes which workers have been cleaned up */ + public static final String CLEANED_UP_DIR = "/_cleanedUpDir"; + /** JSON partition stats key */ + public static final String JSONOBJ_PARTITION_STATS_KEY = + "_partitionStatsKey"; + /** JSON message count key */ + public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey"; + /** JSON metrics key */ + public static final String JSONOBJ_METRICS_KEY = "_metricsKey"; + + /** JSON state key */ + public static final String JSONOBJ_STATE_KEY = "_stateKey"; + /** JSON application attempt key */ + public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY = + "_applicationAttemptKey"; + /** JSON superstep key */ + public static final String JSONOBJ_SUPERSTEP_KEY = + "_superstepKey"; + /** Suffix denotes a worker */ + public static final String WORKER_SUFFIX = "_worker"; + /** Suffix denotes a master */ + public static final String MASTER_SUFFIX = "_master"; + /** If at the end of a checkpoint file, indicates metadata */ + public static final String CHECKPOINT_METADATA_POSTFIX = ".metadata"; + /** + * If at the end of a checkpoint file, indicates vertices, edges, + * messages, etc. + */ + public static final String CHECKPOINT_VERTICES_POSTFIX = ".vertices"; + /** + * If at the end of a checkpoint file, indicates metadata and data is valid + * for the same filenames without .valid + */ + public static final String CHECKPOINT_VALID_POSTFIX = ".valid"; + /** + * If at the end of a checkpoint file, indicates the stitched checkpoint + * file prefixes. A checkpoint is not valid if this file does not exist. + */ + public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized"; + /** Class logger */ + private static final Logger LOG = Logger.getLogger(BspService.class); + /** Path to the job's root */ + protected final String basePath; + /** Path to the job state determined by the master (informative only) */ + protected final String masterJobStatePath; + /** ZooKeeper paths for vertex input splits. */ + protected final InputSplitPaths vertexInputSplitsPaths; + /** ZooKeeper paths for edge input splits. */ + protected final InputSplitPaths edgeInputSplitsPaths; + /** Vertex input split events. */ + protected final InputSplitEvents vertexInputSplitsEvents; + /** Edge input split events. */ + protected final InputSplitEvents edgeInputSplitsEvents; + /** Path to the application attempts) */ + protected final String applicationAttemptsPath; + /** Path to the cleaned up notifications */ + protected final String cleanedUpPath; + /** Path to the checkpoint's root (including job id) */ + protected final String checkpointBasePath; + /** Path to the master election path */ + protected final String masterElectionPath; + /** Private ZooKeeper instance that implements the service */ + private final ZooKeeperExt zk; + /** Has the Connection occurred? */ + private final BspEvent connectedEvent; + /** Has worker registration changed (either healthy or unhealthy) */ + private final BspEvent workerHealthRegistrationChanged; + /** Are the addresses and partition assignments to workers ready? */ + private final BspEvent addressesAndPartitionsReadyChanged; + /** Application attempt changed */ + private final BspEvent applicationAttemptChanged; + /** Superstep finished synchronization */ + private final BspEvent superstepFinished; + /** Master election changed for any waited on attempt */ + private final BspEvent masterElectionChildrenChanged; + /** Cleaned up directory children changed*/ + private final BspEvent cleanedUpChildrenChanged; + /** Registered list of BspEvents */ + private final List<BspEvent> registeredBspEvents = + new ArrayList<BspEvent>(); + /** Immutable configuration of the job*/ + private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf; + /** Job context (mainly for progress) */ + private final Mapper<?, ?, ?, ?>.Context context; + /** Cached superstep (from ZooKeeper) */ + private long cachedSuperstep = UNSET_SUPERSTEP; + /** Restarted from a checkpoint (manual or automatic) */ + private long restartedSuperstep = UNSET_SUPERSTEP; + /** Cached application attempt (from ZooKeeper) */ + private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT; + /** Job id, to ensure uniqueness */ + private final String jobId; + /** Task partition, to ensure uniqueness */ + private final int taskPartition; + /** My hostname */ + private final String hostname; + /** Combination of hostname '_' partition (unique id) */ + private final String hostnamePartitionId; + /** Graph partitioner */ + private final GraphPartitionerFactory<I, V, E, M> graphPartitionerFactory; + /** Mapper that will do the graph computation */ + private final GraphMapper<I, V, E, M> graphMapper; + /** File system */ + private final FileSystem fs; + /** Checkpoint frequency */ + private final int checkpointFrequency; + + /** + * Constructor. + * + * @param serverPortList ZooKeeper server port list + * @param sessionMsecTimeout ZooKeeper session timeount in milliseconds + * @param context Mapper context + * @param graphMapper Graph mapper reference + */ + public BspService(String serverPortList, + int sessionMsecTimeout, + Mapper<?, ?, ?, ?>.Context context, + GraphMapper<I, V, E, M> graphMapper) { + this.vertexInputSplitsEvents = new InputSplitEvents(context); + this.edgeInputSplitsEvents = new InputSplitEvents(context); + this.connectedEvent = new PredicateLock(context); + this.workerHealthRegistrationChanged = new PredicateLock(context); + this.addressesAndPartitionsReadyChanged = new PredicateLock(context); + this.applicationAttemptChanged = new PredicateLock(context); + this.superstepFinished = new PredicateLock(context); + this.masterElectionChildrenChanged = new PredicateLock(context); + this.cleanedUpChildrenChanged = new PredicateLock(context); + + registerBspEvent(connectedEvent); + registerBspEvent(workerHealthRegistrationChanged); + registerBspEvent(vertexInputSplitsEvents.getAllReadyChanged()); + registerBspEvent(vertexInputSplitsEvents.getStateChanged()); + registerBspEvent(edgeInputSplitsEvents.getAllReadyChanged()); + registerBspEvent(edgeInputSplitsEvents.getStateChanged()); + registerBspEvent(addressesAndPartitionsReadyChanged); + registerBspEvent(applicationAttemptChanged); + registerBspEvent(superstepFinished); + registerBspEvent(masterElectionChildrenChanged); + registerBspEvent(cleanedUpChildrenChanged); + + this.context = context; + this.graphMapper = graphMapper; + this.conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>( + context.getConfiguration()); + this.jobId = conf.get("mapred.job.id", "Unknown Job"); + this.taskPartition = conf.getTaskPartition(); + this.restartedSuperstep = conf.getLong( + GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP); + this.cachedSuperstep = restartedSuperstep; + if ((restartedSuperstep != UNSET_SUPERSTEP) && + (restartedSuperstep < 0)) { + throw new IllegalArgumentException( + "BspService: Invalid superstep to restart - " + + restartedSuperstep); + } + try { + this.hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + this.hostnamePartitionId = hostname + "_" + getTaskPartition(); + this.graphPartitionerFactory = conf.createGraphPartitioner(); + + this.checkpointFrequency = conf.getCheckpointFrequency(); + + basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId; + masterJobStatePath = basePath + MASTER_JOB_STATE_NODE; + vertexInputSplitsPaths = new InputSplitPaths(basePath, + VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR, + VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE); + edgeInputSplitsPaths = new InputSplitPaths(basePath, + EDGE_INPUT_SPLIT_DIR, EDGE_INPUT_SPLIT_DONE_DIR, + EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE); + applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR; + cleanedUpPath = basePath + CLEANED_UP_DIR; + checkpointBasePath = getConfiguration().get( + GiraphConstants.CHECKPOINT_DIRECTORY, + GiraphConstants.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId()); + masterElectionPath = basePath + MASTER_ELECTION_DIR; + if (LOG.isInfoEnabled()) { + LOG.info("BspService: Connecting to ZooKeeper with job " + jobId + + ", " + getTaskPartition() + " on " + serverPortList); + } + try { + this.zk = new ZooKeeperExt(serverPortList, + sessionMsecTimeout, + conf.getZookeeperOpsMaxAttempts(), + conf.getZookeeperOpsRetryWaitMsecs(), + this, + context); + connectedEvent.waitForever(); + this.fs = FileSystem.get(getConfiguration()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + /** + * Get the superstep from a ZooKeeper path + * + * @param path Path to parse for the superstep + * @return Superstep from the path. + */ + public static long getSuperstepFromPath(String path) { + int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR); + if (foundSuperstepStart == -1) { + throw new IllegalArgumentException( + "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR + + "from " + path); + } + foundSuperstepStart += SUPERSTEP_DIR.length() + 1; + int endIndex = foundSuperstepStart + + path.substring(foundSuperstepStart).indexOf("/"); + if (endIndex == -1) { + throw new IllegalArgumentException( + "getSuperstepFromPath: Cannot find end of superstep from " + + path); + } + if (LOG.isTraceEnabled()) { + LOG.trace("getSuperstepFromPath: Got path=" + path + + ", start=" + foundSuperstepStart + ", end=" + endIndex); + } + return Long.parseLong(path.substring(foundSuperstepStart, endIndex)); + } + + /** + * Get the hostname and id from a "healthy" worker path + * + * @param path Path to check + * @return Hostname and id from path + */ + public static String getHealthyHostnameIdFromPath(String path) { + int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR); + if (foundWorkerHealthyStart == -1) { + throw new IllegalArgumentException( + "getHealthyHostnameidFromPath: Couldn't find " + + WORKER_HEALTHY_DIR + " from " + path); + } + foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length(); + return path.substring(foundWorkerHealthyStart); + } + + /** + * Generate the base superstep directory path for a given application + * attempt + * + * @param attempt application attempt number + * @return directory path based on the an attempt + */ + public final String getSuperstepPath(long attempt) { + return applicationAttemptsPath + "/" + attempt + SUPERSTEP_DIR; + } + + /** + * Generate the worker information "healthy" directory path for a + * superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getWorkerInfoHealthyPath(long attempt, + long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR; + } + + /** + * Generate the worker information "unhealthy" directory path for a + * superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getWorkerInfoUnhealthyPath(long attempt, + long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR; + } + + /** + * Generate the worker "wrote checkpoint" directory path for a + * superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getWorkerWroteCheckpointPath(long attempt, + long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + WORKER_WROTE_CHECKPOINT_DIR; + } + + /** + * Generate the worker "finished" directory path for a + * superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getWorkerFinishedPath(long attempt, long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR; + } + + /** + * Generate the "addresses and partitions" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getAddressesAndPartitionsPath(long attempt, + long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + ADDRESSES_AND_PARTITIONS_DIR; + } + + /** + * Generate the "partition exchange" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getPartitionExchangePath(long attempt, + long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR; + } + + /** + * Based on the superstep, worker info, and attempt, get the appropriate + * worker path for the exchange. + * + * @param attempt Application attempt + * @param superstep Superstep + * @param workerInfo Worker info of the exchange. + * @return Path of the desired worker + */ + public final String getPartitionExchangeWorkerPath(long attempt, + long superstep, + WorkerInfo workerInfo) { + return getPartitionExchangePath(attempt, superstep) + + "/" + workerInfo.getHostnameId(); + } + + /** + * Generate the "superstep finished" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getSuperstepFinishedPath(long attempt, long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE; + } + + /** + * Generate the base superstep directory path for a given application + * attempt + * + * @param superstep Superstep to use + * @return Directory path based on the a superstep + */ + public final String getCheckpointBasePath(long superstep) { + return checkpointBasePath + "/" + superstep; + } + + /** + * Get the checkpoint from a finalized checkpoint path + * + * @param finalizedPath Path of the finalized checkpoint + * @return Superstep referring to a checkpoint of the finalized path + */ + public static long getCheckpoint(Path finalizedPath) { + if (!finalizedPath.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX)) { + throw new InvalidParameterException( + "getCheckpoint: " + finalizedPath + "Doesn't end in " + + CHECKPOINT_FINALIZED_POSTFIX); + } + String checkpointString = + finalizedPath.getName().replace(CHECKPOINT_FINALIZED_POSTFIX, ""); + return Long.parseLong(checkpointString); + } + + /** + * Get the ZooKeeperExt instance. + * + * @return ZooKeeperExt instance. + */ + public final ZooKeeperExt getZkExt() { + return zk; + } + + @Override + public final long getRestartedSuperstep() { + return restartedSuperstep; + } + + /** + * Set the restarted superstep + * + * @param superstep Set the manually restarted superstep + */ + public final void setRestartedSuperstep(long superstep) { + if (superstep < INPUT_SUPERSTEP) { + throw new IllegalArgumentException( + "setRestartedSuperstep: Bad argument " + superstep); + } + restartedSuperstep = superstep; + } + + /** + * Should checkpoint on this superstep? If checkpointing, always + * checkpoint the first user superstep. If restarting, the first + * checkpoint is after the frequency has been met. + * + * @param superstep Decide if checkpointing no this superstep + * @return True if this superstep should be checkpointed, false otherwise + */ + public final boolean checkpointFrequencyMet(long superstep) { + if (checkpointFrequency == 0) { + return false; + } + long firstCheckpoint = INPUT_SUPERSTEP + 1; + if (getRestartedSuperstep() != UNSET_SUPERSTEP) { + firstCheckpoint = getRestartedSuperstep() + checkpointFrequency; + } + if (superstep < firstCheckpoint) { + return false; + } + return ((superstep - firstCheckpoint) % checkpointFrequency) == 0; + } + + /** + * Get the file system + * + * @return file system + */ + public final FileSystem getFs() { + return fs; + } + + public final ImmutableClassesGiraphConfiguration<I, V, E, M> + getConfiguration() { + return conf; + } + + public final Mapper<?, ?, ?, ?>.Context getContext() { + return context; + } + + public final String getHostname() { + return hostname; + } + + public final String getHostnamePartitionId() { + return hostnamePartitionId; + } + + public final int getTaskPartition() { + return taskPartition; + } + + public final GraphMapper<I, V, E, M> getGraphMapper() { + return graphMapper; + } + + public final BspEvent getWorkerHealthRegistrationChangedEvent() { + return workerHealthRegistrationChanged; + } + + public final BspEvent getAddressesAndPartitionsReadyChangedEvent() { + return addressesAndPartitionsReadyChanged; + } + + + public final BspEvent getApplicationAttemptChangedEvent() { + return applicationAttemptChanged; + } + + public final BspEvent getSuperstepFinishedEvent() { + return superstepFinished; + } + + + public final BspEvent getMasterElectionChildrenChangedEvent() { + return masterElectionChildrenChanged; + } + + public final BspEvent getCleanedUpChildrenChangedEvent() { + return cleanedUpChildrenChanged; + } + + /** + * Get the master commanded job state as a JSONObject. Also sets the + * watches to see if the master commanded job state changes. + * + * @return Last job state or null if none + * @throws InterruptedException + * @throws KeeperException + */ + public final JSONObject getJobState() { + try { + getZkExt().createExt(masterJobStatePath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + LOG.info("getJobState: Job state already exists (" + + masterJobStatePath + ")"); + } catch (KeeperException e) { + throw new IllegalStateException("Failed to create job state path " + + "due to KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException("Failed to create job state path " + + "due to InterruptedException", e); + } + String jobState = null; + try { + List<String> childList = + getZkExt().getChildrenExt( + masterJobStatePath, true, true, true); + if (childList.isEmpty()) { + return null; + } + jobState = + new String(getZkExt().getData( + childList.get(childList.size() - 1), true, null)); + } catch (KeeperException.NoNodeException e) { + LOG.info("getJobState: Job state path is empty! - " + + masterJobStatePath); + } catch (KeeperException e) { + throw new IllegalStateException("Failed to get job state path " + + "children due to KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException("Failed to get job state path " + + "children due to InterruptedException", e); + } + try { + return new JSONObject(jobState); + } catch (JSONException e) { + throw new RuntimeException( + "getJobState: Failed to parse job state " + jobState); + } + } + + /** + * Get the job id + * + * @return job id + */ + public final String getJobId() { + return jobId; + } + + /** + * Get the latest application attempt and cache it. + * + * @return the latest application attempt + */ + public final long getApplicationAttempt() { + if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) { + return cachedApplicationAttempt; + } + try { + getZkExt().createExt(applicationAttemptsPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + LOG.info("getApplicationAttempt: Node " + + applicationAttemptsPath + " already exists!"); + } catch (KeeperException e) { + throw new IllegalStateException("Couldn't create application " + + "attempts path due to KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException("Couldn't create application " + + "attempts path due to InterruptedException", e); + } + try { + List<String> attemptList = + getZkExt().getChildrenExt( + applicationAttemptsPath, true, false, false); + if (attemptList.isEmpty()) { + cachedApplicationAttempt = 0; + } else { + cachedApplicationAttempt = + Long.parseLong(Collections.max(attemptList)); + } + } catch (KeeperException e) { + throw new IllegalStateException("Couldn't get application " + + "attempts to KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException("Couldn't get application " + + "attempts to InterruptedException", e); + } + + return cachedApplicationAttempt; + } + + /** + * Get the latest superstep and cache it. + * + * @return the latest superstep + * @throws InterruptedException + * @throws KeeperException + */ + public final long getSuperstep() { + if (cachedSuperstep != UNSET_SUPERSTEP) { + return cachedSuperstep; + } + String superstepPath = getSuperstepPath(getApplicationAttempt()); + try { + getZkExt().createExt(superstepPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + if (LOG.isInfoEnabled()) { + LOG.info("getApplicationAttempt: Node " + + applicationAttemptsPath + " already exists!"); + } + } catch (KeeperException e) { + throw new IllegalStateException( + "getSuperstep: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "getSuperstep: InterruptedException", e); + } + + List<String> superstepList; + try { + superstepList = + getZkExt().getChildrenExt(superstepPath, true, false, false); + } catch (KeeperException e) { + throw new IllegalStateException( + "getSuperstep: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "getSuperstep: InterruptedException", e); + } + if (superstepList.isEmpty()) { + cachedSuperstep = INPUT_SUPERSTEP; + } else { + cachedSuperstep = + Long.parseLong(Collections.max(superstepList)); + } + + return cachedSuperstep; + } + + /** + * Increment the cached superstep. Shouldn't be the initial value anymore. + */ + public final void incrCachedSuperstep() { + if (cachedSuperstep == UNSET_SUPERSTEP) { + throw new IllegalStateException( + "incrSuperstep: Invalid unset cached superstep " + + UNSET_SUPERSTEP); + } + ++cachedSuperstep; + } + + /** + * Set the cached superstep (should only be used for loading checkpoints + * or recovering from failure). + * + * @param superstep will be used as the next superstep iteration + */ + public final void setCachedSuperstep(long superstep) { + cachedSuperstep = superstep; + } + + /** + * Set the cached application attempt (should only be used for restart from + * failure by the master) + * + * @param applicationAttempt Will denote the new application attempt + */ + public final void setApplicationAttempt(long applicationAttempt) { + cachedApplicationAttempt = applicationAttempt; + String superstepPath = getSuperstepPath(cachedApplicationAttempt); + try { + getZkExt().createExt(superstepPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + throw new IllegalArgumentException( + "setApplicationAttempt: Attempt already exists! - " + + superstepPath, e); + } catch (KeeperException e) { + throw new RuntimeException( + "setApplicationAttempt: KeeperException - " + + superstepPath, e); + } catch (InterruptedException e) { + throw new RuntimeException( + "setApplicationAttempt: InterruptedException - " + + superstepPath, e); + } + } + + /** + * Register a BspEvent. Ensure that it will be signaled + * by catastrophic failure so that threads waiting on an event signal + * will be unblocked. + * + * @param event Event to be registered. + */ + public void registerBspEvent(BspEvent event) { + registeredBspEvents.add(event); + } + + /** + * Subclasses can use this to instantiate their respective partitioners + * + * @return Instantiated graph partitioner factory + */ + protected GraphPartitionerFactory<I, V, E, M> getGraphPartitionerFactory() { + return graphPartitionerFactory; + } + + /** + * Derived classes that want additional ZooKeeper events to take action + * should override this. + * + * @param event Event that occurred + * @return true if the event was processed here, false otherwise + */ + protected boolean processEvent(WatchedEvent event) { + return false; + } + + @Override + public final void process(WatchedEvent event) { + // 1. Process all shared events + // 2. Process specific derived class events + if (LOG.isDebugEnabled()) { + LOG.debug("process: Got a new event, path = " + event.getPath() + + ", type = " + event.getType() + ", state = " + + event.getState()); + } + + if ((event.getPath() == null) && (event.getType() == EventType.None)) { + if (event.getState() == KeeperState.Disconnected) { + // Watches may not be triggered for some time, so signal all BspEvents + for (BspEvent bspEvent : registeredBspEvents) { + bspEvent.signal(); + } + LOG.warn("process: Disconnected from ZooKeeper (will automatically " + + "try to recover) " + event); + } else if (event.getState() == KeeperState.SyncConnected) { + if (LOG.isInfoEnabled()) { + LOG.info("process: Asynchronous connection complete."); + } + connectedEvent.signal(); + } else { + LOG.warn("process: Got unknown null path event " + event); + } + return; + } + + boolean eventProcessed = false; + if (event.getPath().startsWith(masterJobStatePath)) { + // This will cause all becomeMaster() MasterThreads to notice the + // change in job state and quit trying to become the master. + masterElectionChildrenChanged.signal(); + eventProcessed = true; + } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) || + event.getPath().contains(WORKER_UNHEALTHY_DIR)) && + (event.getType() == EventType.NodeChildrenChanged)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: workerHealthRegistrationChanged " + + "(worker health reported - healthy/unhealthy )"); + } + workerHealthRegistrationChanged.signal(); + eventProcessed = true; + } else if (event.getPath().equals( + vertexInputSplitsPaths.getAllReadyPath()) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: inputSplitsReadyChanged " + + "(input splits ready)"); + } + vertexInputSplitsEvents.getAllReadyChanged().signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: vertexInputSplitsStateChanged " + + "(made a reservation)"); + } + vertexInputSplitsEvents.getStateChanged().signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) && + (event.getType() == EventType.NodeDeleted)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: vertexInputSplitsStateChanged " + + "(lost a reservation)"); + } + vertexInputSplitsEvents.getStateChanged().signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_FINISHED_NODE) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: vertexInputSplitsStateChanged " + + "(finished inputsplit)"); + } + vertexInputSplitsEvents.getStateChanged().signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_DONE_DIR) && + (event.getType() == EventType.NodeChildrenChanged)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: vertexInputSplitsDoneStateChanged " + + "(worker finished sending)"); + } + vertexInputSplitsEvents.getDoneStateChanged().signal(); + eventProcessed = true; + } else if (event.getPath().equals( + vertexInputSplitsPaths.getAllDonePath()) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: vertexInputSplitsAllDoneChanged " + + "(all vertices sent from input splits)"); + } + vertexInputSplitsEvents.getAllDoneChanged().signal(); + eventProcessed = true; + } else if (event.getPath().equals( + edgeInputSplitsPaths.getAllReadyPath()) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: edgeInputSplitsReadyChanged " + + "(input splits ready)"); + } + edgeInputSplitsEvents.getAllReadyChanged().signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: edgeInputSplitsStateChanged " + + "(made a reservation)"); + } + edgeInputSplitsEvents.getStateChanged().signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) && + (event.getType() == EventType.NodeDeleted)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: edgeInputSplitsStateChanged " + + "(lost a reservation)"); + } + edgeInputSplitsEvents.getStateChanged().signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_FINISHED_NODE) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: edgeInputSplitsStateChanged " + + "(finished inputsplit)"); + } + edgeInputSplitsEvents.getStateChanged().signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_DONE_DIR) && + (event.getType() == EventType.NodeChildrenChanged)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: edgeInputSplitsDoneStateChanged " + + "(worker finished sending)"); + } + edgeInputSplitsEvents.getDoneStateChanged().signal(); + eventProcessed = true; + } else if (event.getPath().equals( + edgeInputSplitsPaths.getAllDonePath()) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: edgeInputSplitsAllDoneChanged " + + "(all vertices sent from input splits)"); + } + edgeInputSplitsEvents.getAllDoneChanged().signal(); + eventProcessed = true; + } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) && + event.getType() == EventType.NodeCreated) { + if (LOG.isInfoEnabled()) { + LOG.info("process: partitionAssignmentsReadyChanged " + + "(partitions are assigned)"); + } + addressesAndPartitionsReadyChanged.signal(); + eventProcessed = true; + } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) && + event.getType() == EventType.NodeCreated) { + if (LOG.isInfoEnabled()) { + LOG.info("process: superstepFinished signaled"); + } + superstepFinished.signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(applicationAttemptsPath) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isInfoEnabled()) { + LOG.info("process: applicationAttemptChanged signaled"); + } + applicationAttemptChanged.signal(); + eventProcessed = true; + } else if (event.getPath().contains(MASTER_ELECTION_DIR) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isInfoEnabled()) { + LOG.info("process: masterElectionChildrenChanged signaled"); + } + masterElectionChildrenChanged.signal(); + eventProcessed = true; + } else if (event.getPath().equals(cleanedUpPath) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isInfoEnabled()) { + LOG.info("process: cleanedUpChildrenChanged signaled"); + } + cleanedUpChildrenChanged.signal(); + eventProcessed = true; + } + + if (!(processEvent(event)) && (!eventProcessed)) { + LOG.warn("process: Unknown and unprocessed event (path=" + + event.getPath() + ", type=" + event.getType() + + ", state=" + event.getState() + ")"); + } + } +}
