http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspService.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspService.java deleted file mode 100644 index b6e8f0e..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/BspService.java +++ /dev/null @@ -1,1069 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -import org.apache.giraph.bsp.CentralizedService; -import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.graph.partition.GraphPartitionerFactory; -import org.apache.giraph.zk.BspEvent; -import org.apache.giraph.zk.PredicateLock; -import org.apache.giraph.zk.ZooKeeperExt; -import org.apache.giraph.zk.ZooKeeperManager; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooDefs.Ids; -import org.json.JSONException; -import org.json.JSONObject; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.security.InvalidParameterException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * Zookeeper-based implementation of {@link CentralizedService}. - * - * @param <I> Vertex id - * @param <V> Vertex data - * @param <E> Edge data - * @param <M> Message data - */ -@SuppressWarnings("rawtypes") -public abstract class BspService<I extends WritableComparable, - V extends Writable, E extends Writable, M extends Writable> - implements Watcher, CentralizedService<I, V, E, M> { - /** Unset superstep */ - public static final long UNSET_SUPERSTEP = Long.MIN_VALUE; - /** Input superstep (superstep when loading the vertices happens) */ - public static final long INPUT_SUPERSTEP = -1; - /** Unset application attempt */ - public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE; - /** Base ZooKeeper directory */ - public static final String BASE_DIR = "/_hadoopBsp"; - /** Master job state znode above base dir */ - public static final String MASTER_JOB_STATE_NODE = "/_masterJobState"; - /** Vertex input split directory about base dir */ - public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir"; - /** Vertex input split done directory about base dir */ - public static final String VERTEX_INPUT_SPLIT_DONE_DIR = - "/_vertexInputSplitDoneDir"; - /** Denotes a reserved vertex input split */ - public static final String VERTEX_INPUT_SPLIT_RESERVED_NODE = - "/_vertexInputSplitReserved"; - /** Denotes a finished vertex input split */ - public static final String VERTEX_INPUT_SPLIT_FINISHED_NODE = - "/_vertexInputSplitFinished"; - /** Denotes that all the vertex input splits are are ready for consumption */ - public static final String VERTEX_INPUT_SPLITS_ALL_READY_NODE = - "/_vertexInputSplitsAllReady"; - /** Denotes that all the vertex input splits are done. */ - public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE = - "/_vertexInputSplitsAllDone"; - /** Edge input split directory about base dir */ - public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir"; - /** Edge input split done directory about base dir */ - public static final String EDGE_INPUT_SPLIT_DONE_DIR = - "/_edgeInputSplitDoneDir"; - /** Denotes a reserved edge input split */ - public static final String EDGE_INPUT_SPLIT_RESERVED_NODE = - "/_edgeInputSplitReserved"; - /** Denotes a finished edge input split */ - public static final String EDGE_INPUT_SPLIT_FINISHED_NODE = - "/_edgeInputSplitFinished"; - /** Denotes that all the edge input splits are are ready for consumption */ - public static final String EDGE_INPUT_SPLITS_ALL_READY_NODE = - "/_edgeInputSplitsAllReady"; - /** Denotes that all the edge input splits are done. */ - public static final String EDGE_INPUT_SPLITS_ALL_DONE_NODE = - "/_edgeInputSplitsAllDone"; - /** Directory of attempts of this application */ - public static final String APPLICATION_ATTEMPTS_DIR = - "/_applicationAttemptsDir"; - /** Where the master election happens */ - public static final String MASTER_ELECTION_DIR = "/_masterElectionDir"; - /** Superstep scope */ - public static final String SUPERSTEP_DIR = "/_superstepDir"; - /** Where the merged aggregators are located */ - public static final String MERGED_AGGREGATOR_DIR = - "/_mergedAggregatorDir"; - /** Healthy workers register here. */ - public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir"; - /** Unhealthy workers register here. */ - public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir"; - /** Workers which wrote checkpoint notify here */ - public static final String WORKER_WROTE_CHECKPOINT_DIR = - "/_workerWroteCheckpointDir"; - /** Finished workers notify here */ - public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir"; - /** Where the master and worker addresses and partition assignments are set */ - public static final String ADDRESSES_AND_PARTITIONS_DIR = - "/_addressesAndPartitions"; - /** Helps coordinate the partition exchnages */ - public static final String PARTITION_EXCHANGE_DIR = - "/_partitionExchangeDir"; - /** Denotes that the superstep is done */ - public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished"; - /** Denotes which workers have been cleaned up */ - public static final String CLEANED_UP_DIR = "/_cleanedUpDir"; - /** JSON partition stats key */ - public static final String JSONOBJ_PARTITION_STATS_KEY = - "_partitionStatsKey"; - /** JSON message count key */ - public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey"; - /** JSON metrics key */ - public static final String JSONOBJ_METRICS_KEY = "_metricsKey"; - - /** JSON state key */ - public static final String JSONOBJ_STATE_KEY = "_stateKey"; - /** JSON application attempt key */ - public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY = - "_applicationAttemptKey"; - /** JSON superstep key */ - public static final String JSONOBJ_SUPERSTEP_KEY = - "_superstepKey"; - /** Suffix denotes a worker */ - public static final String WORKER_SUFFIX = "_worker"; - /** Suffix denotes a master */ - public static final String MASTER_SUFFIX = "_master"; - /** If at the end of a checkpoint file, indicates metadata */ - public static final String CHECKPOINT_METADATA_POSTFIX = ".metadata"; - /** - * If at the end of a checkpoint file, indicates vertices, edges, - * messages, etc. - */ - public static final String CHECKPOINT_VERTICES_POSTFIX = ".vertices"; - /** - * If at the end of a checkpoint file, indicates metadata and data is valid - * for the same filenames without .valid - */ - public static final String CHECKPOINT_VALID_POSTFIX = ".valid"; - /** - * If at the end of a checkpoint file, indicates the stitched checkpoint - * file prefixes. A checkpoint is not valid if this file does not exist. - */ - public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized"; - /** Class logger */ - private static final Logger LOG = Logger.getLogger(BspService.class); - /** Path to the job's root */ - protected final String basePath; - /** Path to the job state determined by the master (informative only) */ - protected final String masterJobStatePath; - /** ZooKeeper paths for vertex input splits. */ - protected final InputSplitPaths vertexInputSplitsPaths; - /** ZooKeeper paths for edge input splits. */ - protected final InputSplitPaths edgeInputSplitsPaths; - /** Vertex input split events. */ - protected final InputSplitEvents vertexInputSplitsEvents; - /** Edge input split events. */ - protected final InputSplitEvents edgeInputSplitsEvents; - /** Path to the application attempts) */ - protected final String applicationAttemptsPath; - /** Path to the cleaned up notifications */ - protected final String cleanedUpPath; - /** Path to the checkpoint's root (including job id) */ - protected final String checkpointBasePath; - /** Path to the master election path */ - protected final String masterElectionPath; - /** Private ZooKeeper instance that implements the service */ - private final ZooKeeperExt zk; - /** Has the Connection occurred? */ - private final BspEvent connectedEvent; - /** Has worker registration changed (either healthy or unhealthy) */ - private final BspEvent workerHealthRegistrationChanged; - /** Are the addresses and partition assignments to workers ready? */ - private final BspEvent addressesAndPartitionsReadyChanged; - /** Application attempt changed */ - private final BspEvent applicationAttemptChanged; - /** Superstep finished synchronization */ - private final BspEvent superstepFinished; - /** Master election changed for any waited on attempt */ - private final BspEvent masterElectionChildrenChanged; - /** Cleaned up directory children changed*/ - private final BspEvent cleanedUpChildrenChanged; - /** Registered list of BspEvents */ - private final List<BspEvent> registeredBspEvents = - new ArrayList<BspEvent>(); - /** Immutable configuration of the job*/ - private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf; - /** Job context (mainly for progress) */ - private final Mapper<?, ?, ?, ?>.Context context; - /** Cached superstep (from ZooKeeper) */ - private long cachedSuperstep = UNSET_SUPERSTEP; - /** Restarted from a checkpoint (manual or automatic) */ - private long restartedSuperstep = UNSET_SUPERSTEP; - /** Cached application attempt (from ZooKeeper) */ - private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT; - /** Job id, to ensure uniqueness */ - private final String jobId; - /** Task partition, to ensure uniqueness */ - private final int taskPartition; - /** My hostname */ - private final String hostname; - /** Combination of hostname '_' partition (unique id) */ - private final String hostnamePartitionId; - /** Graph partitioner */ - private final GraphPartitionerFactory<I, V, E, M> graphPartitionerFactory; - /** Mapper that will do the graph computation */ - private final GraphMapper<I, V, E, M> graphMapper; - /** File system */ - private final FileSystem fs; - /** Checkpoint frequency */ - private final int checkpointFrequency; - - /** - * Constructor. - * - * @param serverPortList ZooKeeper server port list - * @param sessionMsecTimeout ZooKeeper session timeount in milliseconds - * @param context Mapper context - * @param graphMapper Graph mapper reference - */ - public BspService(String serverPortList, - int sessionMsecTimeout, - Mapper<?, ?, ?, ?>.Context context, - GraphMapper<I, V, E, M> graphMapper) { - this.vertexInputSplitsEvents = new InputSplitEvents(context); - this.edgeInputSplitsEvents = new InputSplitEvents(context); - this.connectedEvent = new PredicateLock(context); - this.workerHealthRegistrationChanged = new PredicateLock(context); - this.addressesAndPartitionsReadyChanged = new PredicateLock(context); - this.applicationAttemptChanged = new PredicateLock(context); - this.superstepFinished = new PredicateLock(context); - this.masterElectionChildrenChanged = new PredicateLock(context); - this.cleanedUpChildrenChanged = new PredicateLock(context); - - registerBspEvent(connectedEvent); - registerBspEvent(workerHealthRegistrationChanged); - registerBspEvent(vertexInputSplitsEvents.getAllReadyChanged()); - registerBspEvent(vertexInputSplitsEvents.getStateChanged()); - registerBspEvent(edgeInputSplitsEvents.getAllReadyChanged()); - registerBspEvent(edgeInputSplitsEvents.getStateChanged()); - registerBspEvent(addressesAndPartitionsReadyChanged); - registerBspEvent(applicationAttemptChanged); - registerBspEvent(superstepFinished); - registerBspEvent(masterElectionChildrenChanged); - registerBspEvent(cleanedUpChildrenChanged); - - this.context = context; - this.graphMapper = graphMapper; - this.conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>( - context.getConfiguration()); - this.jobId = conf.get("mapred.job.id", "Unknown Job"); - this.taskPartition = conf.getTaskPartition(); - this.restartedSuperstep = conf.getLong( - GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP); - this.cachedSuperstep = restartedSuperstep; - if ((restartedSuperstep != UNSET_SUPERSTEP) && - (restartedSuperstep < 0)) { - throw new IllegalArgumentException( - "BspService: Invalid superstep to restart - " + - restartedSuperstep); - } - try { - this.hostname = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } - this.hostnamePartitionId = hostname + "_" + getTaskPartition(); - this.graphPartitionerFactory = conf.createGraphPartitioner(); - - this.checkpointFrequency = conf.getCheckpointFrequency(); - - basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId; - masterJobStatePath = basePath + MASTER_JOB_STATE_NODE; - vertexInputSplitsPaths = new InputSplitPaths(basePath, - VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR, - VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE); - edgeInputSplitsPaths = new InputSplitPaths(basePath, - EDGE_INPUT_SPLIT_DIR, EDGE_INPUT_SPLIT_DONE_DIR, - EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE); - applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR; - cleanedUpPath = basePath + CLEANED_UP_DIR; - checkpointBasePath = getConfiguration().get( - GiraphConstants.CHECKPOINT_DIRECTORY, - GiraphConstants.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId()); - masterElectionPath = basePath + MASTER_ELECTION_DIR; - if (LOG.isInfoEnabled()) { - LOG.info("BspService: Connecting to ZooKeeper with job " + jobId + - ", " + getTaskPartition() + " on " + serverPortList); - } - try { - this.zk = new ZooKeeperExt(serverPortList, - sessionMsecTimeout, - conf.getZookeeperOpsMaxAttempts(), - conf.getZookeeperOpsRetryWaitMsecs(), - this, - context); - connectedEvent.waitForever(); - this.fs = FileSystem.get(getConfiguration()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - - /** - * Get the superstep from a ZooKeeper path - * - * @param path Path to parse for the superstep - * @return Superstep from the path. - */ - public static long getSuperstepFromPath(String path) { - int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR); - if (foundSuperstepStart == -1) { - throw new IllegalArgumentException( - "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR + - "from " + path); - } - foundSuperstepStart += SUPERSTEP_DIR.length() + 1; - int endIndex = foundSuperstepStart + - path.substring(foundSuperstepStart).indexOf("/"); - if (endIndex == -1) { - throw new IllegalArgumentException( - "getSuperstepFromPath: Cannot find end of superstep from " + - path); - } - if (LOG.isTraceEnabled()) { - LOG.trace("getSuperstepFromPath: Got path=" + path + - ", start=" + foundSuperstepStart + ", end=" + endIndex); - } - return Long.parseLong(path.substring(foundSuperstepStart, endIndex)); - } - - /** - * Get the hostname and id from a "healthy" worker path - * - * @param path Path to check - * @return Hostname and id from path - */ - public static String getHealthyHostnameIdFromPath(String path) { - int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR); - if (foundWorkerHealthyStart == -1) { - throw new IllegalArgumentException( - "getHealthyHostnameidFromPath: Couldn't find " + - WORKER_HEALTHY_DIR + " from " + path); - } - foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length(); - return path.substring(foundWorkerHealthyStart); - } - - /** - * Generate the base superstep directory path for a given application - * attempt - * - * @param attempt application attempt number - * @return directory path based on the an attempt - */ - public final String getSuperstepPath(long attempt) { - return applicationAttemptsPath + "/" + attempt + SUPERSTEP_DIR; - } - - /** - * Generate the worker information "healthy" directory path for a - * superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - public final String getWorkerInfoHealthyPath(long attempt, - long superstep) { - return applicationAttemptsPath + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR; - } - - /** - * Generate the worker information "unhealthy" directory path for a - * superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - public final String getWorkerInfoUnhealthyPath(long attempt, - long superstep) { - return applicationAttemptsPath + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR; - } - - /** - * Generate the worker "wrote checkpoint" directory path for a - * superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - public final String getWorkerWroteCheckpointPath(long attempt, - long superstep) { - return applicationAttemptsPath + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + WORKER_WROTE_CHECKPOINT_DIR; - } - - /** - * Generate the worker "finished" directory path for a - * superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - public final String getWorkerFinishedPath(long attempt, long superstep) { - return applicationAttemptsPath + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR; - } - - /** - * Generate the "addresses and partitions" directory path for a superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - public final String getAddressesAndPartitionsPath(long attempt, - long superstep) { - return applicationAttemptsPath + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + ADDRESSES_AND_PARTITIONS_DIR; - } - - /** - * Generate the "partition exchange" directory path for a superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - public final String getPartitionExchangePath(long attempt, - long superstep) { - return applicationAttemptsPath + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR; - } - - /** - * Based on the superstep, worker info, and attempt, get the appropriate - * worker path for the exchange. - * - * @param attempt Application attempt - * @param superstep Superstep - * @param workerInfo Worker info of the exchange. - * @return Path of the desired worker - */ - public final String getPartitionExchangeWorkerPath(long attempt, - long superstep, - WorkerInfo workerInfo) { - return getPartitionExchangePath(attempt, superstep) + - "/" + workerInfo.getHostnameId(); - } - - /** - * Generate the "superstep finished" directory path for a superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - public final String getSuperstepFinishedPath(long attempt, long superstep) { - return applicationAttemptsPath + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE; - } - - /** - * Generate the base superstep directory path for a given application - * attempt - * - * @param superstep Superstep to use - * @return Directory path based on the a superstep - */ - public final String getCheckpointBasePath(long superstep) { - return checkpointBasePath + "/" + superstep; - } - - /** - * Get the checkpoint from a finalized checkpoint path - * - * @param finalizedPath Path of the finalized checkpoint - * @return Superstep referring to a checkpoint of the finalized path - */ - public static long getCheckpoint(Path finalizedPath) { - if (!finalizedPath.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX)) { - throw new InvalidParameterException( - "getCheckpoint: " + finalizedPath + "Doesn't end in " + - CHECKPOINT_FINALIZED_POSTFIX); - } - String checkpointString = - finalizedPath.getName().replace(CHECKPOINT_FINALIZED_POSTFIX, ""); - return Long.parseLong(checkpointString); - } - - /** - * Get the ZooKeeperExt instance. - * - * @return ZooKeeperExt instance. - */ - public final ZooKeeperExt getZkExt() { - return zk; - } - - @Override - public final long getRestartedSuperstep() { - return restartedSuperstep; - } - - /** - * Set the restarted superstep - * - * @param superstep Set the manually restarted superstep - */ - public final void setRestartedSuperstep(long superstep) { - if (superstep < INPUT_SUPERSTEP) { - throw new IllegalArgumentException( - "setRestartedSuperstep: Bad argument " + superstep); - } - restartedSuperstep = superstep; - } - - /** - * Should checkpoint on this superstep? If checkpointing, always - * checkpoint the first user superstep. If restarting, the first - * checkpoint is after the frequency has been met. - * - * @param superstep Decide if checkpointing no this superstep - * @return True if this superstep should be checkpointed, false otherwise - */ - public final boolean checkpointFrequencyMet(long superstep) { - if (checkpointFrequency == 0) { - return false; - } - long firstCheckpoint = INPUT_SUPERSTEP + 1; - if (getRestartedSuperstep() != UNSET_SUPERSTEP) { - firstCheckpoint = getRestartedSuperstep() + checkpointFrequency; - } - if (superstep < firstCheckpoint) { - return false; - } - return ((superstep - firstCheckpoint) % checkpointFrequency) == 0; - } - - /** - * Get the file system - * - * @return file system - */ - public final FileSystem getFs() { - return fs; - } - - public final ImmutableClassesGiraphConfiguration<I, V, E, M> - getConfiguration() { - return conf; - } - - public final Mapper<?, ?, ?, ?>.Context getContext() { - return context; - } - - public final String getHostname() { - return hostname; - } - - public final String getHostnamePartitionId() { - return hostnamePartitionId; - } - - public final int getTaskPartition() { - return taskPartition; - } - - public final GraphMapper<I, V, E, M> getGraphMapper() { - return graphMapper; - } - - public final BspEvent getWorkerHealthRegistrationChangedEvent() { - return workerHealthRegistrationChanged; - } - - public final BspEvent getAddressesAndPartitionsReadyChangedEvent() { - return addressesAndPartitionsReadyChanged; - } - - - public final BspEvent getApplicationAttemptChangedEvent() { - return applicationAttemptChanged; - } - - public final BspEvent getSuperstepFinishedEvent() { - return superstepFinished; - } - - - public final BspEvent getMasterElectionChildrenChangedEvent() { - return masterElectionChildrenChanged; - } - - public final BspEvent getCleanedUpChildrenChangedEvent() { - return cleanedUpChildrenChanged; - } - - /** - * Get the master commanded job state as a JSONObject. Also sets the - * watches to see if the master commanded job state changes. - * - * @return Last job state or null if none - * @throws InterruptedException - * @throws KeeperException - */ - public final JSONObject getJobState() { - try { - getZkExt().createExt(masterJobStatePath, - null, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - } catch (KeeperException.NodeExistsException e) { - LOG.info("getJobState: Job state already exists (" + - masterJobStatePath + ")"); - } catch (KeeperException e) { - throw new IllegalStateException("Failed to create job state path " + - "due to KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException("Failed to create job state path " + - "due to InterruptedException", e); - } - String jobState = null; - try { - List<String> childList = - getZkExt().getChildrenExt( - masterJobStatePath, true, true, true); - if (childList.isEmpty()) { - return null; - } - jobState = - new String(getZkExt().getData( - childList.get(childList.size() - 1), true, null)); - } catch (KeeperException.NoNodeException e) { - LOG.info("getJobState: Job state path is empty! - " + - masterJobStatePath); - } catch (KeeperException e) { - throw new IllegalStateException("Failed to get job state path " + - "children due to KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException("Failed to get job state path " + - "children due to InterruptedException", e); - } - try { - return new JSONObject(jobState); - } catch (JSONException e) { - throw new RuntimeException( - "getJobState: Failed to parse job state " + jobState); - } - } - - /** - * Get the job id - * - * @return job id - */ - public final String getJobId() { - return jobId; - } - - /** - * Get the latest application attempt and cache it. - * - * @return the latest application attempt - */ - public final long getApplicationAttempt() { - if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) { - return cachedApplicationAttempt; - } - try { - getZkExt().createExt(applicationAttemptsPath, - null, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - } catch (KeeperException.NodeExistsException e) { - LOG.info("getApplicationAttempt: Node " + - applicationAttemptsPath + " already exists!"); - } catch (KeeperException e) { - throw new IllegalStateException("Couldn't create application " + - "attempts path due to KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException("Couldn't create application " + - "attempts path due to InterruptedException", e); - } - try { - List<String> attemptList = - getZkExt().getChildrenExt( - applicationAttemptsPath, true, false, false); - if (attemptList.isEmpty()) { - cachedApplicationAttempt = 0; - } else { - cachedApplicationAttempt = - Long.parseLong(Collections.max(attemptList)); - } - } catch (KeeperException e) { - throw new IllegalStateException("Couldn't get application " + - "attempts to KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException("Couldn't get application " + - "attempts to InterruptedException", e); - } - - return cachedApplicationAttempt; - } - - /** - * Get the latest superstep and cache it. - * - * @return the latest superstep - * @throws InterruptedException - * @throws KeeperException - */ - public final long getSuperstep() { - if (cachedSuperstep != UNSET_SUPERSTEP) { - return cachedSuperstep; - } - String superstepPath = getSuperstepPath(getApplicationAttempt()); - try { - getZkExt().createExt(superstepPath, - null, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - } catch (KeeperException.NodeExistsException e) { - if (LOG.isInfoEnabled()) { - LOG.info("getApplicationAttempt: Node " + - applicationAttemptsPath + " already exists!"); - } - } catch (KeeperException e) { - throw new IllegalStateException( - "getSuperstep: KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "getSuperstep: InterruptedException", e); - } - - List<String> superstepList; - try { - superstepList = - getZkExt().getChildrenExt(superstepPath, true, false, false); - } catch (KeeperException e) { - throw new IllegalStateException( - "getSuperstep: KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "getSuperstep: InterruptedException", e); - } - if (superstepList.isEmpty()) { - cachedSuperstep = INPUT_SUPERSTEP; - } else { - cachedSuperstep = - Long.parseLong(Collections.max(superstepList)); - } - - return cachedSuperstep; - } - - /** - * Increment the cached superstep. Shouldn't be the initial value anymore. - */ - public final void incrCachedSuperstep() { - if (cachedSuperstep == UNSET_SUPERSTEP) { - throw new IllegalStateException( - "incrSuperstep: Invalid unset cached superstep " + - UNSET_SUPERSTEP); - } - ++cachedSuperstep; - } - - /** - * Set the cached superstep (should only be used for loading checkpoints - * or recovering from failure). - * - * @param superstep will be used as the next superstep iteration - */ - public final void setCachedSuperstep(long superstep) { - cachedSuperstep = superstep; - } - - /** - * Set the cached application attempt (should only be used for restart from - * failure by the master) - * - * @param applicationAttempt Will denote the new application attempt - */ - public final void setApplicationAttempt(long applicationAttempt) { - cachedApplicationAttempt = applicationAttempt; - String superstepPath = getSuperstepPath(cachedApplicationAttempt); - try { - getZkExt().createExt(superstepPath, - null, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - } catch (KeeperException.NodeExistsException e) { - throw new IllegalArgumentException( - "setApplicationAttempt: Attempt already exists! - " + - superstepPath, e); - } catch (KeeperException e) { - throw new RuntimeException( - "setApplicationAttempt: KeeperException - " + - superstepPath, e); - } catch (InterruptedException e) { - throw new RuntimeException( - "setApplicationAttempt: InterruptedException - " + - superstepPath, e); - } - } - - /** - * Register a BspEvent. Ensure that it will be signaled - * by catastrophic failure so that threads waiting on an event signal - * will be unblocked. - * - * @param event Event to be registered. - */ - public void registerBspEvent(BspEvent event) { - registeredBspEvents.add(event); - } - - /** - * Subclasses can use this to instantiate their respective partitioners - * - * @return Instantiated graph partitioner factory - */ - protected GraphPartitionerFactory<I, V, E, M> getGraphPartitionerFactory() { - return graphPartitionerFactory; - } - - /** - * Derived classes that want additional ZooKeeper events to take action - * should override this. - * - * @param event Event that occurred - * @return true if the event was processed here, false otherwise - */ - protected boolean processEvent(WatchedEvent event) { - return false; - } - - @Override - public final void process(WatchedEvent event) { - // 1. Process all shared events - // 2. Process specific derived class events - if (LOG.isDebugEnabled()) { - LOG.debug("process: Got a new event, path = " + event.getPath() + - ", type = " + event.getType() + ", state = " + - event.getState()); - } - - if ((event.getPath() == null) && (event.getType() == EventType.None)) { - if (event.getState() == KeeperState.Disconnected) { - // Watches may not be triggered for some time, so signal all BspEvents - for (BspEvent bspEvent : registeredBspEvents) { - bspEvent.signal(); - } - LOG.warn("process: Disconnected from ZooKeeper (will automatically " + - "try to recover) " + event); - } else if (event.getState() == KeeperState.SyncConnected) { - if (LOG.isInfoEnabled()) { - LOG.info("process: Asynchronous connection complete."); - } - connectedEvent.signal(); - } else { - LOG.warn("process: Got unknown null path event " + event); - } - return; - } - - boolean eventProcessed = false; - if (event.getPath().startsWith(masterJobStatePath)) { - // This will cause all becomeMaster() MasterThreads to notice the - // change in job state and quit trying to become the master. - masterElectionChildrenChanged.signal(); - eventProcessed = true; - } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) || - event.getPath().contains(WORKER_UNHEALTHY_DIR)) && - (event.getType() == EventType.NodeChildrenChanged)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: workerHealthRegistrationChanged " + - "(worker health reported - healthy/unhealthy )"); - } - workerHealthRegistrationChanged.signal(); - eventProcessed = true; - } else if (event.getPath().equals( - vertexInputSplitsPaths.getAllReadyPath()) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: inputSplitsReadyChanged " + - "(input splits ready)"); - } - vertexInputSplitsEvents.getAllReadyChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: vertexInputSplitsStateChanged " + - "(made a reservation)"); - } - vertexInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeDeleted)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: vertexInputSplitsStateChanged " + - "(lost a reservation)"); - } - vertexInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_FINISHED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: vertexInputSplitsStateChanged " + - "(finished inputsplit)"); - } - vertexInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_DONE_DIR) && - (event.getType() == EventType.NodeChildrenChanged)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: vertexInputSplitsDoneStateChanged " + - "(worker finished sending)"); - } - vertexInputSplitsEvents.getDoneStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().equals( - vertexInputSplitsPaths.getAllDonePath()) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: vertexInputSplitsAllDoneChanged " + - "(all vertices sent from input splits)"); - } - vertexInputSplitsEvents.getAllDoneChanged().signal(); - eventProcessed = true; - } else if (event.getPath().equals( - edgeInputSplitsPaths.getAllReadyPath()) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: edgeInputSplitsReadyChanged " + - "(input splits ready)"); - } - edgeInputSplitsEvents.getAllReadyChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: edgeInputSplitsStateChanged " + - "(made a reservation)"); - } - edgeInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeDeleted)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: edgeInputSplitsStateChanged " + - "(lost a reservation)"); - } - edgeInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_FINISHED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: edgeInputSplitsStateChanged " + - "(finished inputsplit)"); - } - edgeInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_DONE_DIR) && - (event.getType() == EventType.NodeChildrenChanged)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: edgeInputSplitsDoneStateChanged " + - "(worker finished sending)"); - } - edgeInputSplitsEvents.getDoneStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().equals( - edgeInputSplitsPaths.getAllDonePath()) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: edgeInputSplitsAllDoneChanged " + - "(all vertices sent from input splits)"); - } - edgeInputSplitsEvents.getAllDoneChanged().signal(); - eventProcessed = true; - } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) && - event.getType() == EventType.NodeCreated) { - if (LOG.isInfoEnabled()) { - LOG.info("process: partitionAssignmentsReadyChanged " + - "(partitions are assigned)"); - } - addressesAndPartitionsReadyChanged.signal(); - eventProcessed = true; - } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) && - event.getType() == EventType.NodeCreated) { - if (LOG.isInfoEnabled()) { - LOG.info("process: superstepFinished signaled"); - } - superstepFinished.signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(applicationAttemptsPath) && - event.getType() == EventType.NodeChildrenChanged) { - if (LOG.isInfoEnabled()) { - LOG.info("process: applicationAttemptChanged signaled"); - } - applicationAttemptChanged.signal(); - eventProcessed = true; - } else if (event.getPath().contains(MASTER_ELECTION_DIR) && - event.getType() == EventType.NodeChildrenChanged) { - if (LOG.isInfoEnabled()) { - LOG.info("process: masterElectionChildrenChanged signaled"); - } - masterElectionChildrenChanged.signal(); - eventProcessed = true; - } else if (event.getPath().equals(cleanedUpPath) && - event.getType() == EventType.NodeChildrenChanged) { - if (LOG.isInfoEnabled()) { - LOG.info("process: cleanedUpChildrenChanged signaled"); - } - cleanedUpChildrenChanged.signal(); - eventProcessed = true; - } - - if (!(processEvent(event)) && (!eventProcessed)) { - LOG.warn("process: Unknown and unprocessed event (path=" + - event.getPath() + ", type=" + event.getType() + - ", state=" + event.getState() + ")"); - } - } -}
