Updated Branches: refs/heads/trunk 533d52159 -> e4e6af68f
GIRAPH-465: MapFunctions cleanup (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e4e6af68 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e4e6af68 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e4e6af68 Branch: refs/heads/trunk Commit: e4e6af68f3577d5923a4cb0f71321e0ecd204aae Parents: 533d521 Author: Nitay Joffe <[email protected]> Authored: Thu Jan 3 12:31:11 2013 -0500 Committer: Nitay Joffe <[email protected]> Committed: Fri Jan 4 10:59:05 2013 -0500 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/conf/GiraphConfiguration.java | 6 +- .../org/apache/giraph/graph/BspServiceMaster.java | 1 - .../java/org/apache/giraph/graph/GraphMapper.java | 85 ++++++-------- .../java/org/apache/giraph/graph/MapFunctions.java | 92 +++++++++++++++ 5 files changed, 133 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/e4e6af68/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index e36ffcd..0e14a80 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-465: MapFunctions cleanup (nitay) + GIRAPH-464: MasterObserver#applicationFailed callback (nitay) GIRAPH-458: split formats module into accumulo,hbase,hcatalog (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/e4e6af68/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 3e14aad..d5b9efe 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -376,8 +376,7 @@ public class GiraphConfiguration extends Configuration } public int getZooKeeperServerCount() { - return getInt(ZOOKEEPER_SERVER_COUNT, - ZOOKEEPER_SERVER_COUNT_DEFAULT); + return getInt(ZOOKEEPER_SERVER_COUNT, ZOOKEEPER_SERVER_COUNT_DEFAULT); } /** @@ -390,8 +389,7 @@ public class GiraphConfiguration extends Configuration } public int getZooKeeperSessionTimeout() { - return getInt(ZOOKEEPER_SESSION_TIMEOUT, - ZOOKEEPER_SESSION_TIMEOUT_DEFAULT); + return getInt(ZOOKEEPER_SESSION_TIMEOUT, ZOOKEEPER_SESSION_TIMEOUT_DEFAULT); } public int getZookeeperOpsMaxAttempts() { http://git-wip-us.apache.org/repos/asf/giraph/blob/e4e6af68/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java index 41bbcee..85a1da8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java @@ -30,7 +30,6 @@ import org.apache.giraph.comm.netty.NettyMasterServer; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.counters.GiraphStats; -import org.apache.giraph.graph.GraphMapper.MapFunctions; import org.apache.giraph.graph.partition.MasterGraphPartitioner; import org.apache.giraph.graph.partition.PartitionOwner; import org.apache.giraph.graph.partition.PartitionStats; http://git-wip-us.apache.org/repos/asf/giraph/blob/e4e6af68/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java index e491840..e65ec3c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java @@ -143,22 +143,6 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, /** Timer for WorkerContext#preSuperstep() */ private GiraphTimer wcPreSuperstepTimer; - /** What kinds of functions to run on this mapper */ - public enum MapFunctions { - /** Undecided yet */ - UNKNOWN, - /** Only be the master */ - MASTER_ONLY, - /** Only be the master and ZooKeeper */ - MASTER_ZOOKEEPER_ONLY, - /** Only be the worker */ - WORKER_ONLY, - /** Do master, worker, and ZooKeeper */ - ALL, - /** Do master and worker */ - ALL_EXCEPT_ZOOKEEPER - } - /** * Get the map function enum. * @@ -406,10 +390,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, } int sessionMsecTimeout = conf.getZooKeeperSessionTimeout(); try { - if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) || - (mapFunctions == MapFunctions.MASTER_ONLY) || - (mapFunctions == MapFunctions.ALL) || - (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) { + if (mapFunctions.isMaster()) { if (LOG.isInfoEnabled()) { LOG.info("setup: Starting up BspServiceMaster " + "(master thread)..."); @@ -419,9 +400,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, masterThread = new MasterThread<I, V, E, M>(serviceMaster, context); masterThread.start(); } - if ((mapFunctions == MapFunctions.WORKER_ONLY) || - (mapFunctions == MapFunctions.ALL) || - (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) { + if (mapFunctions.isWorker()) { if (LOG.isInfoEnabled()) { LOG.info("setup: Starting up BspServiceWorker..."); } @@ -437,8 +416,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, } catch (IOException e) { LOG.error("setup: Caught exception just before end of setup", e); if (zkManager != null) { - zkManager.offlineZooKeeperServers( - ZooKeeperManager.State.FAILED); + zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FAILED); } throw new RuntimeException( "setup: Offlining servers due to exception...", e); @@ -522,8 +500,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, GiraphMetrics.get(). resetSuperstepMetrics(BspService.INPUT_SUPERSTEP); - if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) || - (mapFunctions == MapFunctions.MASTER_ONLY)) { + if (mapFunctions.isNotAWorker()) { if (LOG.isInfoEnabled()) { LOG.info("map: No need to do anything when not a worker"); } @@ -723,8 +700,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, LOG.error("cleanup: Master thread couldn't join"); } if (zkManager != null) { - zkManager.offlineZooKeeperServers( - ZooKeeperManager.State.FINISHED); + zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED); } } @@ -745,27 +721,40 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, // CHECKSTYLE: stop IllegalCatch } catch (RuntimeException e) { // CHECKSTYLE: resume IllegalCatch - if (mapFunctions == MapFunctions.UNKNOWN || - mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) { - // ZooKeeper may have had an issue - if (zkManager != null) { - zkManager.logZooKeeperOutput(Level.WARN); - } - } - try { - if (mapFunctions == MapFunctions.WORKER_ONLY) { - serviceWorker.failureCleanup(); - } - // Checkstyle exception due to needing to get the original - // exception on failure - // CHECKSTYLE: stop IllegalCatch - } catch (RuntimeException e1) { - // CHECKSTYLE: resume IllegalCatch - LOG.error("run: Worker failure failed on another RuntimeException, " + - "original expection will be rethrown", e1); - } + zooKeeperCleanup(); + workerFailureCleanup(); throw new IllegalStateException( "run: Caught an unrecoverable exception " + e.getMessage(), e); } } + + /** + * Cleanup ZooKeeper ona failure + */ + private void zooKeeperCleanup() { + if (mapFunctions.isZooKeeper()) { + // ZooKeeper may have had an issue + if (zkManager != null) { + zkManager.logZooKeeperOutput(Level.WARN); + } + } + } + + /** + * Cleanup worker on a failure + */ + private void workerFailureCleanup() { + try { + if (mapFunctions.isWorker()) { + serviceWorker.failureCleanup(); + } + // Checkstyle exception due to needing to get the original + // exception on failure + // CHECKSTYLE: stop IllegalCatch + } catch (RuntimeException e1) { + // CHECKSTYLE: resume IllegalCatch + LOG.error("run: Worker failure failed on another RuntimeException, " + + "original expection will be rethrown", e1); + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/e4e6af68/giraph-core/src/main/java/org/apache/giraph/graph/MapFunctions.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MapFunctions.java b/giraph-core/src/main/java/org/apache/giraph/graph/MapFunctions.java new file mode 100644 index 0000000..f5909d3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/MapFunctions.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +/** What kinds of functions to run on this mapper */ +public enum MapFunctions { + /** Undecided yet */ + UNKNOWN { + @Override public boolean isMaster() { return false; } + @Override public boolean isWorker() { return false; } + @Override public boolean isZooKeeper() { return false; } + }, + /** Only be the master */ + MASTER_ONLY { + @Override public boolean isMaster() { return true; } + @Override public boolean isWorker() { return false; } + @Override public boolean isZooKeeper() { return false; } + }, + /** Only be the master and ZooKeeper */ + MASTER_ZOOKEEPER_ONLY { + @Override public boolean isMaster() { return true; } + @Override public boolean isWorker() { return false; } + @Override public boolean isZooKeeper() { return true; } + }, + /** Only be the worker */ + WORKER_ONLY { + @Override public boolean isMaster() { return false; } + @Override public boolean isWorker() { return true; } + @Override public boolean isZooKeeper() { return false; } + }, + /** Do master, worker, and ZooKeeper */ + ALL { + @Override public boolean isMaster() { return true; } + @Override public boolean isWorker() { return true; } + @Override public boolean isZooKeeper() { return true; } + }, + /** Do master and worker */ + ALL_EXCEPT_ZOOKEEPER { + @Override public boolean isMaster() { return true; } + @Override public boolean isWorker() { return true; } + @Override public boolean isZooKeeper() { return false; } + }; + + /** + * Tell whether this function acts as a master. + * + * @return true iff this map function is a master + */ + public abstract boolean isMaster(); + + /** + * Tell whether this function acts as a worker. + * + * @return true iff this map function is a worker + */ + public abstract boolean isWorker(); + + /** + * Tell whether this function acts as a ZooKeeper server. + * + * @return true iff this map function is a zookeeper server + */ + public abstract boolean isZooKeeper(); + + public boolean isKnown() { + return this != UNKNOWN; + } + + public boolean isUnknown() { + return !isKnown(); + } + + public boolean isNotAWorker() { + return isKnown() && !isWorker(); + } +}
