Updated Branches: refs/heads/trunk 34abd6f5b -> 81b837d36
GIRAPH-609: More information on runtime exceptions for Callables Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/81b837d3 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/81b837d3 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/81b837d3 Branch: refs/heads/trunk Commit: 81b837d36189a1d763bd1747ca0550107c155664 Parents: 34abd6f Author: Avery Ching <[email protected]> Authored: Mon Apr 8 10:12:42 2013 -0700 Committer: Avery Ching <[email protected]> Committed: Mon Apr 8 11:11:49 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/edge/EdgeStore.java | 17 ++-- .../org/apache/giraph/graph/GraphTaskManager.java | 6 +- .../org/apache/giraph/master/BspServiceMaster.java | 5 +- .../apache/giraph/utils/LogStacktraceCallable.java | 61 +++++++++++++++ .../org/apache/giraph/worker/BspServiceWorker.java | 6 +- 6 files changed, 85 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index cd21461..f7e0af6 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-609: More information on runtime exceptions for Callables (aching) + GIRAPH-607: Hive IO bump (nitay) GIRAPH-564: Input/output formats and readers/writers should implement http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java index d6653d0..01a67dd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java @@ -20,24 +20,24 @@ package org.apache.giraph.edge; import com.google.common.collect.MapMaker; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; import org.apache.giraph.partition.Partition; import org.apache.giraph.utils.ByteArrayVertexIdEdges; +import org.apache.giraph.utils.LogStacktraceCallable; import org.apache.giraph.utils.ProgressableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - /** * Collects incoming edges for vertices owned by this worker. * @@ -218,7 +218,8 @@ public class EdgeStore<I extends WritableComparable, return null; } }; - movePartitionExecutor.submit(moveCallable); + movePartitionExecutor.submit( + new LogStacktraceCallable<Void>(moveCallable)); } movePartitionExecutor.shutdown(); http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 8ed44e8..9823532 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -37,6 +37,7 @@ import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; +import org.apache.giraph.utils.LogStacktraceCallable; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.utils.ReflectionUtils; @@ -752,7 +753,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, computePartitionIdQueue, conf, serviceWorker); - partitionFutures.add(partitionExecutor.submit(computeCallable)); + LogStacktraceCallable<Collection<PartitionStats>> wrapped = + new LogStacktraceCallable<Collection<PartitionStats>>( + computeCallable); + partitionFutures.add(partitionExecutor.submit(wrapped)); } // Wait until all the threads are done to wait on all requests http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index affe4ff..d01dbb4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -56,6 +56,7 @@ import org.apache.giraph.utils.JMapHistoDumper; import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; +import org.apache.giraph.utils.LogStacktraceCallable; import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.worker.WorkerInfo; import org.apache.giraph.zk.BspEvent; @@ -652,8 +653,8 @@ public class BspServiceMaster<I extends WritableComparable, boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf); for (int i = 0; i < splitList.size(); ++i) { InputSplit inputSplit = splitList.get(i); - taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i, - writeLocations)); + taskExecutor.submit(new LogStacktraceCallable<Void>( + new WriteInputSplit(inputSplit, inputSplitsPath, i, writeLocations))); } taskExecutor.shutdown(); ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext()); http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java new file mode 100644 index 0000000..730825d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import java.util.concurrent.Callable; +import org.apache.log4j.Logger; + +/** + * A wrapper to improve debugging. It passes the call() invocation to the + * provided callable, and upon any exception logs the stacktrace and rethrows + * the exception. The logging functionality is missing in FutureTask. + * + * @param <V> Return type of call() + */ +public class LogStacktraceCallable<V> implements Callable<V> { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(LogStacktraceCallable.class); + + /** Pass call() to this callable. */ + private Callable<V> callable; + + /** + * Construct an instance that will pass call() to the given callable. + * + * @param callable Callable + */ + public LogStacktraceCallable(Callable<V> callable) { + this.callable = callable; + } + + @Override + public V call() throws Exception { + try { + return callable.call(); + // We catch, log stack trace of, and rethrow all exceptions. It's OK to + // skip style check. + // CHECKSTYLE: stop IllegalCatch + } catch (Exception e) { + // CHECKSTYLE: resume IllegalCatch + LOG.error("Execution of callable failed", e); + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 061df68..c20d06e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -42,6 +42,7 @@ import org.apache.giraph.graph.FinishedSuperstepStats; import org.apache.giraph.graph.AddressesAndPartitionsWritable; import org.apache.giraph.graph.GlobalStats; import org.apache.giraph.io.superstep_output.SuperstepOutput; +import org.apache.giraph.utils.LogStacktraceCallable; import org.apache.giraph.utils.JMapHistoDumper; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexOutputFormat; @@ -283,7 +284,10 @@ public class BspServiceWorker<I extends WritableComparable, for (int i = 0; i < numThreads; ++i) { Callable<VertexEdgeCount> inputSplitsCallable = inputSplitsCallableFactory.newCallable(i); - threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable)); + LogStacktraceCallable<VertexEdgeCount> wrapped = + new LogStacktraceCallable<VertexEdgeCount>( + inputSplitsCallable); + threadsFutures.add(inputSplitsExecutor.submit(wrapped)); } // Wait until all the threads are done to wait on all requests
