Updated Branches: refs/heads/trunk 2d7c84c80 -> f630e3d41
GIRAPH-472: Refactor MapFunctions enum to be more general (ereisman) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f630e3d4 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f630e3d4 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f630e3d4 Branch: refs/heads/trunk Commit: f630e3d4129fc39a3350c1dc2278fa9670111e8c Parents: 2d7c84c Author: Eli Reisman <[email protected]> Authored: Fri Jan 18 10:53:36 2013 -0800 Committer: Eli Reisman <[email protected]> Committed: Fri Jan 18 10:53:36 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/graph/GraphFunctions.java | 96 +++++++++++++++ .../java/org/apache/giraph/graph/GraphMapper.java | 38 +++--- .../java/org/apache/giraph/graph/MapFunctions.java | 92 -------------- .../org/apache/giraph/master/BspServiceMaster.java | 14 +- .../org/apache/giraph/worker/BspServiceWorker.java | 8 +- 6 files changed, 128 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/f630e3d4/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 57ca440..8ac7c18 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-472: Refactor MapFunctions enum to be more general (ereisman) + GIRAPH-254: Constant Variable name misspelled in HashMasterPartitioner.java (Gustavo Salazar Torres via ereisman) GIRAPH-431: Support edge and vertex value input formats in GiraphRunner (apresta) http://git-wip-us.apache.org/repos/asf/giraph/blob/f630e3d4/giraph-core/src/main/java/org/apache/giraph/graph/GraphFunctions.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphFunctions.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphFunctions.java new file mode 100644 index 0000000..308499f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphFunctions.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +/** + * Each compute node running on the underlying cluster + * is marked with this enum to indicate the worker or + * master task(s) it must perform during job runs. + */ +public enum GraphFunctions { + /** Undecided yet */ + UNKNOWN { + @Override public boolean isMaster() { return false; } + @Override public boolean isWorker() { return false; } + @Override public boolean isZooKeeper() { return false; } + }, + /** Only be the master */ + MASTER_ONLY { + @Override public boolean isMaster() { return true; } + @Override public boolean isWorker() { return false; } + @Override public boolean isZooKeeper() { return false; } + }, + /** Only be the master and ZooKeeper */ + MASTER_ZOOKEEPER_ONLY { + @Override public boolean isMaster() { return true; } + @Override public boolean isWorker() { return false; } + @Override public boolean isZooKeeper() { return true; } + }, + /** Only be the worker */ + WORKER_ONLY { + @Override public boolean isMaster() { return false; } + @Override public boolean isWorker() { return true; } + @Override public boolean isZooKeeper() { return false; } + }, + /** Do master, worker, and ZooKeeper */ + ALL { + @Override public boolean isMaster() { return true; } + @Override public boolean isWorker() { return true; } + @Override public boolean isZooKeeper() { return true; } + }, + /** Do master and worker */ + ALL_EXCEPT_ZOOKEEPER { + @Override public boolean isMaster() { return true; } + @Override public boolean isWorker() { return true; } + @Override public boolean isZooKeeper() { return false; } + }; + + /** + * Tell whether this function acts as a master. + * + * @return true iff this map function is a master + */ + public abstract boolean isMaster(); + + /** + * Tell whether this function acts as a worker. + * + * @return true iff this map function is a worker + */ + public abstract boolean isWorker(); + + /** + * Tell whether this function acts as a ZooKeeper server. + * + * @return true iff this map function is a zookeeper server + */ + public abstract boolean isZooKeeper(); + + public boolean isKnown() { + return this != UNKNOWN; + } + + public boolean isUnknown() { + return !isKnown(); + } + + public boolean isNotAWorker() { + return isKnown() && !isWorker(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f630e3d4/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java index dd4dee4..9470a06 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java @@ -124,7 +124,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, /** Already complete? */ private boolean done = false; /** What kind of functions is this mapper doing? */ - private MapFunctions mapFunctions = MapFunctions.UNKNOWN; + private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN; /** Total number of vertices in the graph (at this time) */ private long numVertices = -1; /** Total number of edges in the graph (at this time) */ @@ -157,8 +157,8 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, * * @return Map functions of this mapper. */ - public MapFunctions getMapFunctions() { - return mapFunctions; + public GraphFunctions getGraphFunctions() { + return graphFunctions; } /** @@ -259,33 +259,33 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, * ZooKeeper * @return Functions that this mapper should do. */ - private static MapFunctions determineMapFunctions( + private static GraphFunctions determineMapFunctions( ImmutableClassesGiraphConfiguration conf, ZooKeeperManager zkManager) { boolean splitMasterWorker = conf.getSplitMasterWorker(); int taskPartition = conf.getTaskPartition(); boolean zkAlreadyProvided = conf.getZookeeperList() != null; - MapFunctions functions = MapFunctions.UNKNOWN; + GraphFunctions functions = GraphFunctions.UNKNOWN; // What functions should this mapper do? if (!splitMasterWorker) { if ((zkManager != null) && zkManager.runsZooKeeper()) { - functions = MapFunctions.ALL; + functions = GraphFunctions.ALL; } else { - functions = MapFunctions.ALL_EXCEPT_ZOOKEEPER; + functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER; } } else { if (zkAlreadyProvided) { int masterCount = conf.getZooKeeperServerCount(); if (taskPartition < masterCount) { - functions = MapFunctions.MASTER_ONLY; + functions = GraphFunctions.MASTER_ONLY; } else { - functions = MapFunctions.WORKER_ONLY; + functions = GraphFunctions.WORKER_ONLY; } } else { if ((zkManager != null) && zkManager.runsZooKeeper()) { - functions = MapFunctions.MASTER_ZOOKEEPER_ONLY; + functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY; } else { - functions = MapFunctions.WORKER_ONLY; + functions = GraphFunctions.WORKER_ONLY; } } } @@ -390,7 +390,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, } context.setStatus("setup: Connected to Zookeeper service " + serverPortList); - this.mapFunctions = determineMapFunctions(conf, zkManager); + this.graphFunctions = determineMapFunctions(conf, zkManager); // Sometimes it takes a while to get multiple ZooKeeper servers up if (conf.getZooKeeperServerCount() > 1) { @@ -399,7 +399,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, } int sessionMsecTimeout = conf.getZooKeeperSessionTimeout(); try { - if (mapFunctions.isMaster()) { + if (graphFunctions.isMaster()) { if (LOG.isInfoEnabled()) { LOG.info("setup: Starting up BspServiceMaster " + "(master thread)..."); @@ -409,7 +409,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, masterThread = new MasterThread<I, V, E, M>(serviceMaster, context); masterThread.start(); } - if (mapFunctions.isWorker()) { + if (graphFunctions.isWorker()) { if (LOG.isInfoEnabled()) { LOG.info("setup: Starting up BspServiceWorker..."); } @@ -431,7 +431,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, "setup: Offlining servers due to exception...", e); } - context.setStatus(getMapFunctions().toString() + " starting..."); + context.setStatus(getGraphFunctions().toString() + " starting..."); } /** @@ -509,7 +509,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, GiraphMetrics.get(). resetSuperstepMetrics(BspService.INPUT_SUPERSTEP); - if (mapFunctions.isNotAWorker()) { + if (graphFunctions.isNotAWorker()) { if (LOG.isInfoEnabled()) { LOG.info("map: No need to do anything when not a worker"); } @@ -691,7 +691,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, public void cleanup(Context context) throws IOException, InterruptedException { if (LOG.isInfoEnabled()) { - LOG.info("cleanup: Starting for " + getMapFunctions()); + LOG.info("cleanup: Starting for " + getGraphFunctions()); } if (done) { return; @@ -741,7 +741,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, * Cleanup ZooKeeper ona failure */ private void zooKeeperCleanup() { - if (mapFunctions.isZooKeeper()) { + if (graphFunctions.isZooKeeper()) { // ZooKeeper may have had an issue if (zkManager != null) { zkManager.logZooKeeperOutput(Level.WARN); @@ -754,7 +754,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, */ private void workerFailureCleanup() { try { - if (mapFunctions.isWorker()) { + if (graphFunctions.isWorker()) { serviceWorker.failureCleanup(); } // Checkstyle exception due to needing to get the original http://git-wip-us.apache.org/repos/asf/giraph/blob/f630e3d4/giraph-core/src/main/java/org/apache/giraph/graph/MapFunctions.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MapFunctions.java b/giraph-core/src/main/java/org/apache/giraph/graph/MapFunctions.java deleted file mode 100644 index f5909d3..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/MapFunctions.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -/** What kinds of functions to run on this mapper */ -public enum MapFunctions { - /** Undecided yet */ - UNKNOWN { - @Override public boolean isMaster() { return false; } - @Override public boolean isWorker() { return false; } - @Override public boolean isZooKeeper() { return false; } - }, - /** Only be the master */ - MASTER_ONLY { - @Override public boolean isMaster() { return true; } - @Override public boolean isWorker() { return false; } - @Override public boolean isZooKeeper() { return false; } - }, - /** Only be the master and ZooKeeper */ - MASTER_ZOOKEEPER_ONLY { - @Override public boolean isMaster() { return true; } - @Override public boolean isWorker() { return false; } - @Override public boolean isZooKeeper() { return true; } - }, - /** Only be the worker */ - WORKER_ONLY { - @Override public boolean isMaster() { return false; } - @Override public boolean isWorker() { return true; } - @Override public boolean isZooKeeper() { return false; } - }, - /** Do master, worker, and ZooKeeper */ - ALL { - @Override public boolean isMaster() { return true; } - @Override public boolean isWorker() { return true; } - @Override public boolean isZooKeeper() { return true; } - }, - /** Do master and worker */ - ALL_EXCEPT_ZOOKEEPER { - @Override public boolean isMaster() { return true; } - @Override public boolean isWorker() { return true; } - @Override public boolean isZooKeeper() { return false; } - }; - - /** - * Tell whether this function acts as a master. - * - * @return true iff this map function is a master - */ - public abstract boolean isMaster(); - - /** - * Tell whether this function acts as a worker. - * - * @return true iff this map function is a worker - */ - public abstract boolean isWorker(); - - /** - * Tell whether this function acts as a ZooKeeper server. - * - * @return true iff this map function is a zookeeper server - */ - public abstract boolean isZooKeeper(); - - public boolean isKnown() { - return this != UNKNOWN; - } - - public boolean isUnknown() { - return !isKnown(); - } - - public boolean isNotAWorker() { - return isKnown() && !isWorker(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/f630e3d4/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 33f9f4a..61748cc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -32,7 +32,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.counters.GiraphStats; import org.apache.giraph.graph.AddressesAndPartitionsWritable; import org.apache.giraph.bsp.BspService; -import org.apache.giraph.graph.MapFunctions; +import org.apache.giraph.graph.GraphFunctions; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.GiraphInputFormat; import org.apache.giraph.graph.GlobalStats; @@ -442,7 +442,7 @@ public class BspServiceMaster<I extends WritableComparable, failJob = false; break; } - getContext().setStatus(getGraphMapper().getMapFunctions() + " " + + getContext().setStatus(getGraphMapper().getGraphFunctions() + " " + "checkWorkers: Only found " + totalResponses + " responses of " + maxWorkers + @@ -491,7 +491,7 @@ public class BspServiceMaster<I extends WritableComparable, return null; } - getContext().setStatus(getGraphMapper().getMapFunctions() + " " + + getContext().setStatus(getGraphMapper().getGraphFunctions() + " " + "checkWorkers: Done - Found " + totalResponses + " responses of " + maxWorkers + " needed to start superstep " + getSuperstep()); @@ -1270,7 +1270,7 @@ public class BspServiceMaster<I extends WritableComparable, LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers); } } - getContext().setStatus(getGraphMapper().getMapFunctions() + " - " + + getContext().setStatus(getGraphMapper().getGraphFunctions() + " - " + finishedHostnameIdList.size() + " finished out of " + workerInfoList.size() + @@ -1561,9 +1561,9 @@ public class BspServiceMaster<I extends WritableComparable, } // Need to wait for the number of workers and masters to complete int maxTasks = BspInputFormat.getMaxTasks(getConfiguration()); - if ((getGraphMapper().getMapFunctions() == MapFunctions.ALL) || - (getGraphMapper().getMapFunctions() == - MapFunctions.ALL_EXCEPT_ZOOKEEPER)) { + if ((getGraphMapper().getGraphFunctions() == GraphFunctions.ALL) || + (getGraphMapper().getGraphFunctions() == + GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) { maxTasks *= 2; } List<String> cleanedUpChildrenList = null; http://git-wip-us.apache.org/repos/asf/giraph/blob/f630e3d4/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 31a4dc6..f2ccb24 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -695,7 +695,7 @@ else[HADOOP_NON_SECURE]*/ } getContext().setStatus("startSuperstep: " + - getGraphMapper().getMapFunctions().toString() + + getGraphMapper().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep()); return addressesAndPartitions.getPartitionOwners(); @@ -747,7 +747,7 @@ else[HADOOP_NON_SECURE]*/ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "finishSuperstep: (waiting for rest " + "of workers) " + - getGraphMapper().getMapFunctions().toString() + + getGraphMapper().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep()); @@ -765,7 +765,7 @@ else[HADOOP_NON_SECURE]*/ } incrCachedSuperstep(); getContext().setStatus("finishSuperstep: (all workers done) " + - getGraphMapper().getMapFunctions().toString() + + getGraphMapper().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep()); @@ -952,7 +952,7 @@ else[HADOOP_NON_SECURE]*/ public void storeCheckpoint() throws IOException { LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "storeCheckpoint: Starting checkpoint " + - getGraphMapper().getMapFunctions().toString() + + getGraphMapper().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep());
