Updated Branches: refs/heads/perf 882545d1c -> ed8b54eee
GIRAPH-566: Make option for aggregators to be configurable (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f4bd1996 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f4bd1996 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f4bd1996 Branch: refs/heads/perf Commit: f4bd1996e66fbe3f4daf8aca1e7be1e9cead5dee Parents: a802fef Author: Maja Kabiljo <[email protected]> Authored: Thu Mar 21 15:49:12 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Thu Mar 21 15:49:12 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../java/org/apache/giraph/comm/ServerData.java | 4 ++-- .../giraph/comm/aggregators/AggregatorUtils.java | 15 +++++---------- .../comm/aggregators/AllAggregatorServerData.java | 12 +++++++++--- .../aggregators/OwnerAggregatorServerData.java | 10 ++++++++-- .../giraph/worker/WorkerAggregatorHandler.java | 3 ++- 6 files changed, 28 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 3032873..a1446a7 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-566: Make option for aggregators to be configurable (majakabiljo) + GIRAPH-575: update hive-io (nitay) GIRAPH-576: BspServiceMaster.failureCleanup() shouldn't pass null in http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index e6dff8c..70dc156 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -107,8 +107,8 @@ public class ServerData<I extends WritableComparable, new SimplePartitionStore<I, V, E, M>(configuration, context); } edgeStore = new EdgeStore<I, V, E, M>(service, configuration, context); - ownerAggregatorData = new OwnerAggregatorServerData(context); - allAggregatorData = new AllAggregatorServerData(context); + ownerAggregatorData = new OwnerAggregatorServerData(context, configuration); + allAggregatorData = new AllAggregatorServerData(context, configuration); } public EdgeStore<I, V, E, M> getEdgeStore() { http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java index 0abd7e1..ceb30a8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java @@ -20,6 +20,7 @@ package org.apache.giraph.comm.aggregators; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; @@ -78,19 +79,13 @@ public class AggregatorUtils { * catch all exceptions. * * @param aggregatorClass Class of aggregator + * @param conf Configuration * @return New aggregator */ public static Aggregator<Writable> newAggregatorInstance( - Class<Aggregator<Writable>> aggregatorClass) { - try { - return aggregatorClass.newInstance(); - } catch (InstantiationException e) { - throw new IllegalStateException("createAggregator: " + - "InstantiationException for aggregator class " + aggregatorClass, e); - } catch (IllegalAccessException e) { - throw new IllegalStateException("createAggregator: " + - "IllegalAccessException for aggregator class " + aggregatorClass, e); - } + Class<Aggregator<Writable>> aggregatorClass, + ImmutableClassesGiraphConfiguration conf) { + return ReflectionUtils.newInstance(aggregatorClass, conf); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java index f38c6cd..177e738 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java @@ -19,6 +19,7 @@ package org.apache.giraph.comm.aggregators; import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.master.MasterInfo; import org.apache.giraph.utils.TaskIdsPermitsBarrier; import org.apache.hadoop.io.Writable; @@ -83,14 +84,19 @@ public class AllAggregatorServerData { private final TaskIdsPermitsBarrier workersBarrier; /** Progressable used to report progress */ private final Progressable progressable; + /** Configuration */ + private final ImmutableClassesGiraphConfiguration conf; /** * Constructor * * @param progressable Progressable used to report progress + * @param conf Configuration */ - public AllAggregatorServerData(Progressable progressable) { + public AllAggregatorServerData(Progressable progressable, + ImmutableClassesGiraphConfiguration conf) { this.progressable = progressable; + this.conf = conf; workersBarrier = new TaskIdsPermitsBarrier(progressable); masterBarrier = new TaskIdsPermitsBarrier(progressable); } @@ -106,7 +112,7 @@ public class AllAggregatorServerData { aggregatorClassMap.put(name, aggregatorClass); if (!aggregatorTypesMap.containsKey(aggregatorClass)) { aggregatorTypesMap.putIfAbsent(aggregatorClass, - AggregatorUtils.newAggregatorInstance(aggregatorClass)); + AggregatorUtils.newAggregatorInstance(aggregatorClass, conf)); } progressable.progress(); } @@ -225,7 +231,7 @@ public class AllAggregatorServerData { currentAggregatorMap.get(entry.getKey()); if (aggregator == null) { currentAggregatorMap.put(entry.getKey(), - AggregatorUtils.newAggregatorInstance(entry.getValue())); + AggregatorUtils.newAggregatorInstance(entry.getValue(), conf)); } else { aggregator.reset(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java index bd6068a..eb25a2e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java @@ -19,6 +19,7 @@ package org.apache.giraph.comm.aggregators; import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.utils.TaskIdsPermitsBarrier; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; @@ -72,14 +73,19 @@ public class OwnerAggregatorServerData { private final TaskIdsPermitsBarrier workersBarrier; /** Progressable used to report progress */ private final Progressable progressable; + /** Configuration */ + private final ImmutableClassesGiraphConfiguration conf; /** * Constructor * * @param progressable Progressable used to report progress + * @param conf Configuration */ - public OwnerAggregatorServerData(Progressable progressable) { + public OwnerAggregatorServerData(Progressable progressable, + ImmutableClassesGiraphConfiguration conf) { this.progressable = progressable; + this.conf = conf; workersBarrier = new TaskIdsPermitsBarrier(progressable); } @@ -95,7 +101,7 @@ public class OwnerAggregatorServerData { LOG.debug("registerAggregator: The first registration after a reset()"); } myAggregatorMap.putIfAbsent(name, - AggregatorUtils.newAggregatorInstance(aggregatorClass)); + AggregatorUtils.newAggregatorInstance(aggregatorClass, conf)); progressable.progress(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java index 3c18449..9a8a8b8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java @@ -290,7 +290,8 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) { threadAggregatorMap.put(entry.getKey(), AggregatorUtils.newAggregatorInstance( - (Class<Aggregator<Writable>>) entry.getValue().getClass())); + (Class<Aggregator<Writable>>) entry.getValue().getClass(), + conf)); } }
