[GIRAPH-1013] Adding reducer handle utilities Summary: And more functional interfaces, and PairWritable
Test Plan: mvn clean install Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D40269 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/77f8a075 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/77f8a075 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/77f8a075 Branch: refs/heads/trunk Commit: 77f8a075ccc029cb608a382f5deb7cc0b27b02e5 Parents: add1d4f Author: Igor Kabiljo <[email protected]> Authored: Wed Jun 17 12:47:52 2015 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Thu Jun 25 17:39:32 2015 -0700 ---------------------------------------------------------------------- giraph-block-app/pom.xml | 4 + .../reducers/array/ArrayOfHandles.java | 127 ++++++ .../block_app/reducers/array/ArrayReduce.java | 211 ++++++++++ .../reducers/array/BasicArrayReduce.java | 353 ++++++++++++++++ .../reducers/array/HugeArrayUtils.java | 404 +++++++++++++++++++ .../block_app/reducers/array/package-info.java | 21 + .../CollectPrimitiveReduceOperation.java | 84 ++++ .../collect/CollectReduceOperation.java | 50 +++ .../CollectShardedPrimitiveReducerHandle.java | 96 +++++ .../collect/CollectShardedReducerHandle.java | 85 ++++ ...tShardedTuplesOfPrimitivesReducerHandle.java | 158 ++++++++ ...ollectTuplesOfPrimitivesReduceOperation.java | 96 +++++ .../reducers/collect/ShardedReducerHandle.java | 123 ++++++ .../reducers/collect/package-info.java | 21 + .../block_app/reducers/map/BasicMapReduce.java | 276 +++++++++++++ .../block_app/reducers/map/package-info.java | 21 + .../giraph/block_app/reducers/package-info.java | 21 + .../apache/giraph/function/TripleFunction.java | 41 ++ .../function/primitive/Obj2DoubleFunction.java | 33 ++ .../function/primitive/Obj2FloatFunction.java | 33 ++ .../function/primitive/Obj2LongFunction.java | 33 ++ .../reducers/array/ObjectStripingTest.java | 58 +++ giraph-core/pom.xml | 4 + .../impl/KryoWrappedReduceOperation.java | 86 ++++ .../writable/tuple/DoubleDoubleWritable.java | 39 ++ .../writable/tuple/IntDoubleWritable.java | 40 ++ .../giraph/writable/tuple/IntIntWritable.java | 39 ++ .../giraph/writable/tuple/IntLongWritable.java | 40 ++ .../writable/tuple/LongDoubleWritable.java | 40 ++ .../giraph/writable/tuple/LongIntWritable.java | 40 ++ .../giraph/writable/tuple/LongLongWritable.java | 39 ++ .../giraph/writable/tuple/PairWritable.java | 113 ++++++ .../giraph/writable/tuple/package-info.java | 21 + pom.xml | 6 + 34 files changed, 2856 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-block-app/pom.xml b/giraph-block-app/pom.xml index 1f653bb..a05c1c5 100644 --- a/giraph-block-app/pom.xml +++ b/giraph-block-app/pom.xml @@ -87,6 +87,10 @@ under the License. <dependencies> <!-- compile dependencies. sorted lexicographically. --> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> <groupId>it.unimi.dsi</groupId> <artifactId>fastutil</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java new file mode 100644 index 0000000..053fd61 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.array; + +import java.util.ArrayList; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.ArrayHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.function.primitive.Int2ObjFunction; +import org.apache.giraph.worker.WorkerBroadcastUsage; + +/** + * ArrayHandle implemented as an array of individual handles. + * + * @param <H> Handle type + */ +public class ArrayOfHandles<H> implements ArrayHandle<H> { + protected final ArrayList<H> handles; + + public ArrayOfHandles(int count, Supplier<H> reduceHandleFactory) { + handles = new ArrayList<>(); + for (int i = 0; i < count; i++) { + handles.add(reduceHandleFactory.get()); + } + } + + public ArrayOfHandles(int count, Int2ObjFunction<H> reduceHandleFactory) { + handles = new ArrayList<>(); + for (int i = 0; i < count; i++) { + handles.add(reduceHandleFactory.apply(i)); + } + } + + @Override + public H get(int index) { + return handles.get(index); + } + + @Override + public int getStaticSize() { + return handles.size(); + } + + /** + * ReducerArrayHandle implemented as an array of separate reducer handles. + * + * @param <H> Handle type + */ + public static class ArrayOfReducers<S, R> + extends ArrayOfHandles<ReducerHandle<S, R>> + implements ReducerArrayHandle<S, R> { + + public ArrayOfReducers( + int count, Supplier<ReducerHandle<S, R>> reduceHandleFactory) { + super(count, reduceHandleFactory); + } + + public ArrayOfReducers( + int count, Int2ObjFunction<ReducerHandle<S, R>> reduceHandleFactory) { + super(count, reduceHandleFactory); + } + + @Override + public int getReducedSize(BlockMasterApi master) { + return getStaticSize(); + } + + @Override + public BroadcastArrayHandle<R> broadcastValue(final BlockMasterApi master) { + return new ArrayOfBroadcasts<>( + getStaticSize(), + new Int2ObjFunction<BroadcastHandle<R>>() { + @Override + public BroadcastHandle<R> apply(int index) { + return get(index).broadcastValue(master); + } + }); + } + } + + /** + * BroadcastArrayHandle implemented as an array of separate broadcast handles. + * + * @param <T> Handle type + */ + public static class ArrayOfBroadcasts<T> + extends ArrayOfHandles<BroadcastHandle<T>> + implements BroadcastArrayHandle<T> { + + public ArrayOfBroadcasts( + int count, + Int2ObjFunction<BroadcastHandle<T>> broadcastHandleFactory) { + super(count, broadcastHandleFactory); + } + + public ArrayOfBroadcasts( + int count, + Supplier<BroadcastHandle<T>> broadcastHandleFactory) { + super(count, broadcastHandleFactory); + } + + @Override + public int getBroadcastedSize(WorkerBroadcastUsage worker) { + return getStaticSize(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java new file mode 100644 index 0000000..f2cdf8c --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.array; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Array; + +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle; +import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.utils.ArrayWritable; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.hadoop.io.Writable; + +/** + * One reducer representing reduction of array of individual values. + * Elements are represented as object, and so BasicArrayReduce should be + * used instead when elements are primitive types. + * + * @param <S> Single value type, objects passed on workers + * @param <R> Reduced value type + */ +public class ArrayReduce<S, R extends Writable> + implements ReduceOperation<Pair<IntRef, S>, ArrayWritable<R>> { + private int fixedSize; + private ReduceOperation<S, R> elementReduceOp; + private Class<R> elementClass; + + public ArrayReduce() { + } + + /** + * Create ReduceOperation that reduces arrays by reducing individual + * elements. + * + * @param fixedSize Number of elements + * @param elementReduceOp ReduceOperation for individual elements + */ + public ArrayReduce(int fixedSize, ReduceOperation<S, R> elementReduceOp) { + this.fixedSize = fixedSize; + this.elementReduceOp = elementReduceOp; + init(); + } + + /** + * Registers one new reducer, that will reduce array of objects, + * by reducing individual elements using {@code elementReduceOp}. + * + * This function will return ReducerArrayHandle to it, by which + * individual elements can be manipulated separately. + * + * @param fixedSize Number of elements + * @param elementReduceOp ReduceOperation for individual elements + * @param createFunction Function for creating a reducer + * @return Created ReducerArrayHandle + */ + public static <S, T extends Writable> + ReducerArrayHandle<S, T> createArrayHandles( + final int fixedSize, ReduceOperation<S, T> elementReduceOp, + CreateReducerFunctionApi createFunction) { + final ReducerHandle<Pair<IntRef, S>, ArrayWritable<T>> reduceHandle = + createFunction.createReducer( + new ArrayReduce<>(fixedSize, elementReduceOp)); + + final IntRef curIndex = new IntRef(0); + final MutablePair<IntRef, S> reusablePair = + MutablePair.of(new IntRef(0), null); + final ReducerHandle<S, T> elementReduceHandle = new ReducerHandle<S, T>() { + @Override + public T getReducedValue(MasterGlobalCommUsage master) { + ArrayWritable<T> result = reduceHandle.getReducedValue(master); + return result.get()[curIndex.value]; + } + + @Override + public void reduce(S valueToReduce) { + reusablePair.getLeft().value = curIndex.value; + reusablePair.setRight(valueToReduce); + reduceHandle.reduce(reusablePair); + } + + @Override + public BroadcastHandle<T> broadcastValue(BlockMasterApi master) { + throw new UnsupportedOperationException(); + } + }; + + return new ReducerArrayHandle<S, T>() { + @Override + public ReducerHandle<S, T> get(int index) { + curIndex.value = index; + return elementReduceHandle; + } + + @Override + public int getStaticSize() { + return fixedSize; + } + + @Override + public int getReducedSize(BlockMasterApi master) { + return getStaticSize(); + } + + @Override + public BroadcastArrayHandle<T> broadcastValue(BlockMasterApi master) { + final BroadcastHandle<ArrayWritable<T>> broadcastHandle = + reduceHandle.broadcastValue(master); + final IntRef curIndex = new IntRef(0); + final BroadcastHandle<T> + elementBroadcastHandle = new BroadcastHandle<T>() { + @Override + public T getBroadcast(WorkerBroadcastUsage worker) { + ArrayWritable<T> result = broadcastHandle.getBroadcast(worker); + return result.get()[curIndex.value]; + } + }; + return new BroadcastArrayHandle<T>() { + @Override + public BroadcastHandle<T> get(int index) { + curIndex.value = index; + return elementBroadcastHandle; + } + + @Override + public int getStaticSize() { + return fixedSize; + } + + @Override + public int getBroadcastedSize(WorkerBroadcastUsage worker) { + return getStaticSize(); + } + }; + } + }; + } + + private void init() { + elementClass = (Class<R>) elementReduceOp.createInitialValue().getClass(); + } + + @Override + public ArrayWritable<R> createInitialValue() { + R[] values = (R[]) Array.newInstance(elementClass, fixedSize); + for (int i = 0; i < fixedSize; i++) { + values[i] = elementReduceOp.createInitialValue(); + } + return new ArrayWritable<>(elementClass, values); + } + + @Override + public ArrayWritable<R> reduce( + ArrayWritable<R> curValue, Pair<IntRef, S> valueToReduce) { + int index = valueToReduce.getLeft().value; + curValue.get()[index] = + elementReduceOp.reduce(curValue.get()[index], valueToReduce.getRight()); + return curValue; + } + + @Override + public ArrayWritable<R> reduceMerge( + ArrayWritable<R> curValue, ArrayWritable<R> valueToReduce) { + for (int i = 0; i < fixedSize; i++) { + curValue.get()[i] = + elementReduceOp.reduceMerge( + curValue.get()[i], valueToReduce.get()[i]); + } + return curValue; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(fixedSize); + WritableUtils.writeWritableObject(elementReduceOp, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + fixedSize = in.readInt(); + elementReduceOp = WritableUtils.readWritableObject(in, null); + init(); + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java new file mode 100644 index 0000000..91ced16 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.array; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle; +import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.hadoop.io.Writable; + +/** + * Efficient generic primitive array reduce operation. + * + * Allows two modes - fixed size, and infinite size + * (with keeping only actually used elements and resizing) + * + * @param <S> Single value type + * @param <R> Reduced value type + */ +public class BasicArrayReduce<S, R extends Writable> + implements ReduceOperation<Pair<IntRef, S>, BasicArrayList<R>> { + private int fixedSize; + private PrimitiveTypeOps<R> typeOps; + private ReduceOperation<S, R> elementReduceOp; + private R initialElement; + private R reusable; + private R reusable2; + + public BasicArrayReduce() { + } + + + /** + * Create ReduceOperation that reduces BasicArrays by reducing individual + * elements, with predefined size. + * + * @param fixedSize Number of elements + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + */ + public BasicArrayReduce( + int fixedSize, + PrimitiveTypeOps<R> typeOps, + ReduceOperation<S, R> elementReduceOp) { + this.fixedSize = fixedSize; + this.typeOps = typeOps; + this.elementReduceOp = elementReduceOp; + init(); + } + + + /** + * Create ReduceOperation that reduces BasicArrays by reducing individual + * elements, with unbounded size. + * + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + */ + public BasicArrayReduce( + PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp) { + this(-1, typeOps, elementReduceOp); + } + + + /** + * Registers one new local reducer, that will reduce BasicArray, + * by reducing individual elements using {@code elementReduceOp}, + * with unbounded size. + * + * This function will return ReducerArrayHandle, by which + * individual elements can be manipulated separately. + * + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param reduceApi API for creating reducers + * @return Created ReducerArrayHandle + */ + public static <S, R extends Writable> + ReducerArrayHandle<S, R> createLocalArrayHandles( + PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp, + CreateReducersApi reduceApi) { + return createLocalArrayHandles(-1, typeOps, elementReduceOp, reduceApi); + } + + /** + * Registers one new local reducer, that will reduce BasicArray, + * by reducing individual elements using {@code elementReduceOp}, + * with predefined size. + * + * This function will return ReducerArrayHandle, by which + * individual elements can be manipulated separately. + * + * @param fixedSize Number of elements + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param reduceApi API for creating reducers + * @return Created ReducerArrayHandle + */ + public static <S, R extends Writable> + ReducerArrayHandle<S, R> createLocalArrayHandles( + int fixedSize, PrimitiveTypeOps<R> typeOps, + ReduceOperation<S, R> elementReduceOp, + final CreateReducersApi reduceApi) { + return createArrayHandles(fixedSize, typeOps, elementReduceOp, + new CreateReducerFunctionApi() { + @Override + public <S, R extends Writable> ReducerHandle<S, R> createReducer( + ReduceOperation<S, R> reduceOp) { + return reduceApi.createLocalReducer(reduceOp); + } + }); + } + + /** + * Registers one new reducer, that will reduce BasicArray, + * by reducing individual elements using {@code elementReduceOp}, + * with unbounded size. + * + * This function will return ReducerArrayHandle, by which + * individual elements can be manipulated separately. + * + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param createFunction Function for creating a reducer + * @return Created ReducerArrayHandle + */ + public static <S, R extends Writable> + ReducerArrayHandle<S, R> createArrayHandles( + PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp, + CreateReducerFunctionApi createFunction) { + return createArrayHandles(-1, typeOps, elementReduceOp, createFunction); + } + + /** + * Registers one new reducer, that will reduce BasicArray, + * by reducing individual elements using {@code elementReduceOp}, + * with predefined size. + * + * This function will return ReducerArrayHandle, by which + * individual elements can be manipulated separately. + * + * @param fixedSize Number of elements + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param createFunction Function for creating a reducer + * @return Created ReducerArrayHandle + */ + public static <S, R extends Writable> + ReducerArrayHandle<S, R> createArrayHandles( + final int fixedSize, final PrimitiveTypeOps<R> typeOps, + ReduceOperation<S, R> elementReduceOp, + CreateReducerFunctionApi createFunction) { + final ReducerHandle<Pair<IntRef, S>, BasicArrayList<R>> reduceHandle = + createFunction.createReducer( + new BasicArrayReduce<>(fixedSize, typeOps, elementReduceOp)); + final IntRef curIndex = new IntRef(0); + final R reusableValue = typeOps.create(); + final R initialValue = elementReduceOp.createInitialValue(); + final MutablePair<IntRef, S> reusablePair = + MutablePair.of(new IntRef(0), null); + final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() { + @Override + public R getReducedValue(MasterGlobalCommUsage master) { + BasicArrayList<R> result = reduceHandle.getReducedValue(master); + if (fixedSize == -1 && curIndex.value >= result.size()) { + typeOps.set(reusableValue, initialValue); + } else { + result.getInto(curIndex.value, reusableValue); + } + return reusableValue; + } + + @Override + public void reduce(S valueToReduce) { + reusablePair.getLeft().value = curIndex.value; + reusablePair.setRight(valueToReduce); + reduceHandle.reduce(reusablePair); + } + + @Override + public BroadcastHandle<R> broadcastValue(BlockMasterApi master) { + throw new UnsupportedOperationException(); + } + }; + + return new ReducerArrayHandle<S, R>() { + @Override + public ReducerHandle<S, R> get(int index) { + curIndex.value = index; + return elementReduceHandle; + } + + @Override + public int getStaticSize() { + if (fixedSize == -1) { + throw new UnsupportedOperationException( + "Cannot call size, when one is not specified upfront"); + } + return fixedSize; + } + + @Override + public int getReducedSize(BlockMasterApi master) { + return reduceHandle.getReducedValue(master).size(); + } + + @Override + public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) { + final BroadcastHandle<BasicArrayList<R>> broadcastHandle = + reduceHandle.broadcastValue(master); + final IntRef curIndex = new IntRef(0); + final R reusableValue = typeOps.create(); + final BroadcastHandle<R> + elementBroadcastHandle = new BroadcastHandle<R>() { + @Override + public R getBroadcast(WorkerBroadcastUsage worker) { + BasicArrayList<R> result = broadcastHandle.getBroadcast(worker); + if (fixedSize == -1 && curIndex.value >= result.size()) { + typeOps.set(reusableValue, initialValue); + } else { + result.getInto(curIndex.value, reusableValue); + } + return reusableValue; + } + }; + return new BroadcastArrayHandle<R>() { + @Override + public BroadcastHandle<R> get(int index) { + curIndex.value = index; + return elementBroadcastHandle; + } + + @Override + public int getStaticSize() { + if (fixedSize == -1) { + throw new UnsupportedOperationException( + "Cannot call size, when one is not specified upfront"); + } + return fixedSize; + } + + @Override + public int getBroadcastedSize(WorkerBroadcastUsage worker) { + return broadcastHandle.getBroadcast(worker).size(); + } + }; + } + }; + } + + + private void init() { + initialElement = elementReduceOp.createInitialValue(); + reusable = typeOps.create(); + reusable2 = typeOps.create(); + } + + @Override + public BasicArrayList<R> createInitialValue() { + if (fixedSize != -1) { + BasicArrayList<R> list = typeOps.createArrayList(fixedSize); + fill(list, fixedSize); + return list; + } else { + return typeOps.createArrayList(1); + } + } + + private void fill(BasicArrayList<R> list, int newSize) { + if (fixedSize != -1 && newSize > fixedSize) { + throw new IllegalArgumentException(newSize + " larger then " + fixedSize); + } + + if (list.capacity() < newSize) { + list.setCapacity(newSize); + } + while (list.size() < newSize) { + list.add(initialElement); + } + } + + @Override + public BasicArrayList<R> reduce( + BasicArrayList<R> curValue, Pair<IntRef, S> valueToReduce) { + int index = valueToReduce.getLeft().value; + fill(curValue, index + 1); + curValue.getInto(index, reusable); + R result = elementReduceOp.reduce(reusable, valueToReduce.getRight()); + curValue.set(index, result); + return curValue; + } + + @Override + public BasicArrayList<R> reduceMerge( + BasicArrayList<R> curValue, BasicArrayList<R> valueToReduce) { + fill(curValue, valueToReduce.size()); + for (int i = 0; i < valueToReduce.size(); i++) { + valueToReduce.getInto(i, reusable2); + curValue.getInto(i, reusable); + R result = elementReduceOp.reduceMerge(reusable, reusable2); + curValue.set(i, result); + } + + return curValue; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(fixedSize); + TypeOpsUtils.writeTypeOps(typeOps, out); + WritableUtils.writeWritableObject(elementReduceOp, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + fixedSize = in.readInt(); + typeOps = TypeOpsUtils.readTypeOps(in); + elementReduceOp = WritableUtils.readWritableObject(in, null); + init(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java new file mode 100644 index 0000000..be5d4fe --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.array; + +import java.util.ArrayList; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle; +import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfBroadcasts; +import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfReducers; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.function.ObjectHolder; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.function.primitive.Int2ObjFunction; +import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.utils.ArrayWritable; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.hadoop.io.Writable; + +/** + * Utility class when we are dealing with huge arrays (i.e. large number of + * elements) within reducing/broadcasting. + * + * In Giraph, for each reducer there is a worker machine which is it's owner, + * which does partial aggregation for it. So if we have only single huge + * reducer - other workers will have to wait, while that single worker is doing + * huge reducing operation. + * On the other hand, each reducer has a meaningful overhead, so we should try + * to keep number of reducers as low as possible (in total less then 10k is a + * good number). + * What we want is to split such huge reducers into slightly more then number + * of worker reducers, and NUM_REDUCERS = 50000 is used here as a good middle + * ground. + * + * So when we have huge array, we don't want one reducer/broadcast for each + * element, but we also don't want one reducer/broadcast for the whole array. + * + * This class allows transparent split into reasonable number of reducers + * (~50000), which solves both of the above issues. + */ +public class HugeArrayUtils { + // Striping perfectly reducers of up to 25GB (i.e. 500KB * NUM_STRIPES). + private static final IntConfOption NUM_STRIPES = new IntConfOption( + "giraph.reducers.HugeArrayUtils.num_stripes", 50000, + "Number of distict reducers to create. If array is smaller then this" + + "number, each element will be it's own reducer"); + + private HugeArrayUtils() { } + + /** + * Create global array of reducers, by splitting the huge array + * into NUM_STRIPES number of parts. + * + * @param fixedSize Number of elements + * @param elementReduceOp ReduceOperation for individual elements + * @param reduceApi Api for creating reducers + * @return Created ReducerArrayHandle + */ + public static <S, R extends Writable> + ReducerArrayHandle<S, R> createGlobalReducerArrayHandle( + final int fixedSize, final ReduceOperation<S, R> elementReduceOp, + final CreateReducersApi reduceApi) { + return createGlobalReducerArrayHandle( + fixedSize, elementReduceOp, reduceApi, + NUM_STRIPES.get(reduceApi.getConf())); + } + + /** + * Create global array of reducers, by splitting the huge array + * into {@code maxNumStripes} number of parts. + * + * @param fixedSize Number of elements + * @param elementReduceOp ReduceOperation for individual elements + * @param reduceApi Api for creating reducers + * @param maxNumStripes Maximal number of reducers to create. + * @return Created ReducerArrayHandle + */ + public static <S, R extends Writable> + ReducerArrayHandle<S, R> createGlobalReducerArrayHandle( + final int fixedSize, final ReduceOperation<S, R> elementReduceOp, + final CreateReducersApi reduceApi, int maxNumStripes) { + PrimitiveTypeOps<R> typeOps = TypeOpsUtils.getPrimitiveTypeOpsOrNull( + (Class<R>) elementReduceOp.createInitialValue().getClass()); + + final CreateReducerFunctionApi + createReducer = new CreateReducerFunctionApi() { + @Override + public <S, R extends Writable> ReducerHandle<S, R> createReducer( + ReduceOperation<S, R> reduceOp) { + return reduceApi.createGlobalReducer(reduceOp); + } + }; + + if (fixedSize < maxNumStripes) { + return new ArrayOfReducers<>( + fixedSize, + new Supplier<ReducerHandle<S, R>>() { + @Override + public ReducerHandle<S, R> get() { + return createReducer.createReducer(elementReduceOp); + } + }); + } else { + final ObjectStriping striping = + new ObjectStriping(fixedSize, maxNumStripes); + + final ArrayList<ReducerArrayHandle<S, R>> handles = + new ArrayList<>(striping.getSplits()); + for (int i = 0; i < striping.getSplits(); i++) { + if (typeOps != null) { + handles.add(BasicArrayReduce.createArrayHandles( + striping.getSplitSize(i), typeOps, + elementReduceOp, createReducer)); + } else { + handles.add(ArrayReduce.createArrayHandles( + striping.getSplitSize(i), elementReduceOp, createReducer)); + } + } + + return new ReducerArrayHandle<S, R>() { + @Override + public ReducerHandle<S, R> get(int index) { + if ((index >= fixedSize) || (index < 0)) { + throw new RuntimeException( + "Reducer Access out of bounds: requested : " + + index + " from array of size : " + fixedSize); + } + int reducerIndex = striping.getSplitIndex(index); + int insideIndex = striping.getInsideIndex(index); + return handles.get(reducerIndex).get(insideIndex); + } + + @Override + public int getStaticSize() { + return fixedSize; + } + + @Override + public int getReducedSize(BlockMasterApi master) { + return getStaticSize(); + } + + @Override + public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) { + throw new UnsupportedOperationException("for now not supported"); + } + }; + } + } + + /** + * Broadcast a huge array, by splitting into NUM_STRIPES number of parts. + * + * @param count Number of elements + * @param valueSupplier Supplier of value to be broadcasted for a given index + * @param master Master API + * @return Created BroadcastArrayHandle + */ + public static <V extends Writable> BroadcastArrayHandle<V> broadcast( + final int count, + final Int2ObjFunction<V> valueSupplier, + final BlockMasterApi master) { + return broadcast(count, valueSupplier, null, master); + } + + /** + * Broadcast a huge array, by splitting into NUM_STRIPES number of parts. + * Efficient for primitive types, using BasicArray underneath. + * + * @param count Number of elements + * @param valueSupplier Supplier of value to be broadcasted for a given index + * @param typeOps Element TypeOps + * @param master Master API + * @return Created BroadcastArrayHandle + */ + public static <V extends Writable> BroadcastArrayHandle<V> broadcast( + final int count, + final Int2ObjFunction<V> valueSupplier, + final PrimitiveTypeOps<V> typeOps, + final BlockMasterApi master) { + int numStripes = NUM_STRIPES.get(master.getConf()); + if (count < numStripes) { + return new ArrayOfBroadcasts<>( + count, + new Int2ObjFunction<BroadcastHandle<V>>() { + @Override + public BroadcastHandle<V> apply(int i) { + // We create a copy because the valueSupplier might return a + // reusable obj. This function is NOT safe if typeOps is null + // & valueSupplier returns reusable + return master.broadcast( + typeOps != null ? + typeOps.createCopy(valueSupplier.apply(i)) : + valueSupplier.apply(i)); + } + }); + } else { + ObjectStriping striping = new ObjectStriping(count, numStripes); + final Int2ObjFunction<BroadcastHandle<V>> handleSupplier; + + if (typeOps != null) { + handleSupplier = getPrimitiveBroadcastHandleSupplier( + valueSupplier, typeOps, master, striping); + } else { + handleSupplier = getObjectBroadcastHandleSupplier( + valueSupplier, master, striping); + } + return new BroadcastArrayHandle<V>() { + @Override + public BroadcastHandle<V> get(int index) { + if (index >= count || index < 0) { + throw new RuntimeException( + "Broadcast Access out of bounds: requested: " + + index + " from array of size : " + count); + } + return handleSupplier.apply(index); + } + + @Override + public int getBroadcastedSize(WorkerBroadcastUsage worker) { + return count; + } + + @Override + public int getStaticSize() { + return count; + } + }; + } + } + + private static <V extends Writable> + Int2ObjFunction<BroadcastHandle<V>> getObjectBroadcastHandleSupplier( + final Int2ObjFunction<V> valueSupplier, + final BlockMasterApi master, final ObjectStriping striping) { + final ObjectHolder<Class<V>> elementClass = new ObjectHolder<>(); + final ArrayOfHandles<BroadcastHandle<ArrayWritable<V>>> arrayOfBroadcasts = + new ArrayOfHandles<>( + striping.getSplits(), + new Int2ObjFunction<BroadcastHandle<ArrayWritable<V>>>() { + @Override + public BroadcastHandle<ArrayWritable<V>> apply(int value) { + int size = striping.getSplitSize(value); + int start = striping.getSplitStart(value); + V[] array = (V[]) new Writable[size]; + for (int i = 0; i < size; i++) { + array[i] = valueSupplier.apply(start + i); + if (elementClass.get() == null) { + elementClass.apply((Class<V>) array[i].getClass()); + } + } + return master.broadcast( + new ArrayWritable<>(elementClass.get(), array)); + } + }); + + final IntRef insideIndex = new IntRef(-1); + final ObjectHolder<BroadcastHandle<ArrayWritable<V>>> handleHolder = + new ObjectHolder<>(); + + final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() { + @Override + public V getBroadcast(WorkerBroadcastUsage worker) { + return handleHolder.get().getBroadcast(worker).get()[insideIndex.value]; + } + }; + + return createBroadcastHandleSupplier( + striping, arrayOfBroadcasts, insideIndex, handleHolder, + reusableHandle); + } + + private static <V extends Writable> + Int2ObjFunction<BroadcastHandle<V>> getPrimitiveBroadcastHandleSupplier( + final Int2ObjFunction<V> valueSupplier, final PrimitiveTypeOps<V> typeOps, + final BlockMasterApi master, final ObjectStriping striping) { + final ArrayOfHandles<BroadcastHandle<BasicArrayList<V>>> arrayOfBroadcasts = + new ArrayOfHandles<>( + striping.getSplits(), + new Int2ObjFunction<BroadcastHandle<BasicArrayList<V>>>() { + @Override + public BroadcastHandle<BasicArrayList<V>> apply(int value) { + int size = striping.getSplitSize(value); + int start = striping.getSplitStart(value); + BasicArrayList<V> array = typeOps.createArrayList(size); + for (int i = 0; i < size; i++) { + array.add(valueSupplier.apply(start + i)); + } + return master.broadcast(array); + } + }); + + final IntRef insideIndex = new IntRef(-1); + final ObjectHolder<BroadcastHandle<BasicArrayList<V>>> handleHolder = + new ObjectHolder<>(); + final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() { + private final V reusable = typeOps.create(); + @Override + public V getBroadcast(WorkerBroadcastUsage worker) { + handleHolder.get().getBroadcast(worker).getInto( + insideIndex.value, reusable); + return reusable; + } + }; + + return createBroadcastHandleSupplier( + striping, arrayOfBroadcasts, insideIndex, handleHolder, + reusableHandle); + } + + private static <V extends Writable, A> + Int2ObjFunction<BroadcastHandle<V>> createBroadcastHandleSupplier( + final ObjectStriping striping, + final ArrayOfHandles<BroadcastHandle<A>> arrayOfBroadcasts, + final IntRef insideIndex, + final ObjectHolder<BroadcastHandle<A>> handleHolder, + final BroadcastHandle<V> reusableHandle) { + final Int2ObjFunction<BroadcastHandle<V>> handleProvider = + new Int2ObjFunction<BroadcastHandle<V>>() { + @Override + public BroadcastHandle<V> apply(int index) { + int broadcastIndex = striping.getSplitIndex(index); + insideIndex.value = striping.getInsideIndex(index); + handleHolder.apply(arrayOfBroadcasts.get(broadcastIndex)); + return reusableHandle; + } + }; + return handleProvider; + } + + /** + * Handles indices calculations when spliting one range into smaller number + * of splits, where indices stay consecutive. + */ + static class ObjectStriping { + private final int splits; + private final int indicesPerObject; + private final int overflowNum; + private final int beforeOverflow; + + public ObjectStriping(int size, int splits) { + this.splits = splits; + this.indicesPerObject = size / splits; + this.overflowNum = size % splits; + this.beforeOverflow = overflowNum * (indicesPerObject + 1); + } + + public int getSplits() { + return splits; + } + + public int getSplitSize(int splitIndex) { + return indicesPerObject + (splitIndex < overflowNum ? 1 : 0); + } + + public int getSplitStart(int splitIndex) { + if (splitIndex < overflowNum) { + return splitIndex * (indicesPerObject + 1); + } else { + return beforeOverflow + (splitIndex - overflowNum) * indicesPerObject; + } + } + + public int getSplitIndex(int objectIndex) { + if (objectIndex < beforeOverflow) { + return objectIndex / (indicesPerObject + 1); + } else { + return (objectIndex - beforeOverflow) / indicesPerObject + overflowNum; + } + } + + public int getInsideIndex(int objectIndex) { + if (objectIndex < beforeOverflow) { + return objectIndex % (indicesPerObject + 1); + } else { + return (objectIndex - beforeOverflow) % indicesPerObject; + } + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java new file mode 100644 index 0000000..33f8a24 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Reducers for collecting arrays of objects. + */ +package org.apache.giraph.block_app.reducers.array; http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java new file mode 100644 index 0000000..13dd153 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.types.ops.collections.ResettableIterator; +import org.apache.giraph.utils.WritableUtils; + +/** + * Collect primitive values reduce operation + * + * @param <S> Primitive Writable type, which has its type ops + */ +public class CollectPrimitiveReduceOperation<S> + extends KryoWrappedReduceOperation<S, BasicArrayList<S>> { + /** + * Type ops if available, or null + */ + private PrimitiveTypeOps<S> typeOps; + + /** For reflection only */ + public CollectPrimitiveReduceOperation() { + } + + public CollectPrimitiveReduceOperation(PrimitiveTypeOps<S> typeOps) { + this.typeOps = typeOps; + } + + @Override + public BasicArrayList<S> createValue() { + return createList(); + } + + @Override + public void reduce(BasicArrayList<S> reduceInto, S value) { + reduceInto.add(value); + } + + @Override + public void reduceMerge(BasicArrayList<S> reduceInto, + BasicArrayList<S> toReduce) { + ResettableIterator<S> iterator = toReduce.fastIterator(); + while (iterator.hasNext()) { + reduceInto.add(iterator.next()); + } + } + + public BasicArrayList<S> createList() { + return typeOps.createArrayList(); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeClass(typeOps.getTypeClass(), out); + } + + @Override + public void readFields(DataInput in) throws IOException { + typeOps = TypeOpsUtils.getPrimitiveTypeOps( + WritableUtils.<S>readClass(in)); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java new file mode 100644 index 0000000..304ac47 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation; + +/** + * Collect values reduce operation + * + * @param <S> Type of values to collect + */ +public class CollectReduceOperation<S> + extends KryoWrappedReduceOperation<S, List<S>> { + @Override + public List<S> createValue() { + return createList(); + } + + @Override + public void reduce(List<S> reduceInto, S value) { + reduceInto.add(value); + } + + @Override + public void reduceMerge(List<S> reduceInto, List<S> toReduce) { + reduceInto.addAll(toReduce); + } + + public List<S> createList() { + return new ArrayList<>(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java new file mode 100644 index 0000000..b29b297 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; + +/** + * ShardedReducerHandle where we keep a list of reduced values, + * when primitives are used + * + * @param <S> Single value type + */ +public class CollectShardedPrimitiveReducerHandle<S> + extends ShardedReducerHandle<S, BasicArrayList<S>> { + /** + * Type ops if available, or null + */ + private final PrimitiveTypeOps<S> typeOps; + + public CollectShardedPrimitiveReducerHandle(final CreateReducersApi reduceApi, + Class<S> valueClass) { + typeOps = TypeOpsUtils.getPrimitiveTypeOps(valueClass); + register(reduceApi); + } + + @Override + public ReduceOperation<S, KryoWritableWrapper<BasicArrayList<S>>> + createReduceOperation() { + return new CollectPrimitiveReduceOperation<>(typeOps); + } + + @Override + public BasicArrayList<S> createReduceResult(MasterGlobalCommUsage master) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += reducers.get(i).getReducedValue(master).get().size(); + } + return createList(size); + } + + public BasicArrayList<S> createList(int size) { + return typeOps.createArrayList(size); + } + + @Override + public BroadcastHandle<BasicArrayList<S>> createBroadcastHandle( + BroadcastArrayHandle<KryoWritableWrapper<BasicArrayList<S>>> broadcasts) { + return new CollectShardedPrimitiveBroadcastHandle(broadcasts); + } + + /** + * Broadcast handle for CollectShardedPrimitiveReducerHandle + */ + public class CollectShardedPrimitiveBroadcastHandle + extends ShardedBroadcastHandle { + public CollectShardedPrimitiveBroadcastHandle( + BroadcastArrayHandle<KryoWritableWrapper<BasicArrayList<S>>> + broadcasts) { + super(broadcasts); + } + + @Override + public BasicArrayList<S> createBroadcastResult( + WorkerBroadcastUsage worker) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += broadcasts.get(i).getBroadcast(worker).get().size(); + } + return createList(size); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java new file mode 100644 index 0000000..5132ecf --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; + +/** + * ShardedReducerHandle where we keep a list of reduced values + * + * @param <S> Single value type + */ +public class CollectShardedReducerHandle<S> + extends ShardedReducerHandle<S, List<S>> { + public CollectShardedReducerHandle(CreateReducersApi reduceApi) { + register(reduceApi); + } + + @Override + public ReduceOperation<S, KryoWritableWrapper<List<S>>> + createReduceOperation() { + return new CollectReduceOperation<>(); + } + + @Override + public List<S> createReduceResult(MasterGlobalCommUsage master) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += reducers.get(i).getReducedValue(master).get().size(); + } + return createList(size); + } + + public List<S> createList(int size) { + return new ArrayList<S>(size); + } + + @Override + public BroadcastHandle<List<S>> createBroadcastHandle( + BroadcastArrayHandle<KryoWritableWrapper<List<S>>> broadcasts) { + return new CollectShardedBroadcastHandle(broadcasts); + } + + /** + * BroadcastHandle for CollectShardedReducerHandle + */ + public class CollectShardedBroadcastHandle extends ShardedBroadcastHandle { + public CollectShardedBroadcastHandle( + BroadcastArrayHandle<KryoWritableWrapper<List<S>>> broadcasts) { + super(broadcasts); + } + + @Override + public List<S> createBroadcastResult(WorkerBroadcastUsage worker) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += broadcasts.get(i).getBroadcast(worker).get().size(); + } + return createList(size); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java new file mode 100644 index 0000000..3222c17 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; + +/** + * ShardedReducerHandle where we keep a list of reduced values, + * and values consist of multiple primitives, so we keep one primitive + * list for each + */ +@SuppressWarnings("unchecked") +public class CollectShardedTuplesOfPrimitivesReducerHandle +extends ShardedReducerHandle<List<Object>, List<BasicArrayList>> { + /** + * Type ops if available, or null + */ + private final List<PrimitiveTypeOps> typeOpsList; + + public CollectShardedTuplesOfPrimitivesReducerHandle( + final CreateReducersApi reduceApi, Class<?>... valueClasses) { + typeOpsList = new ArrayList<>(); + for (Class<?> valueClass : valueClasses) { + typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(valueClass)); + } + register(reduceApi); + } + + public List<Object> createSingleValue() { + List<Object> ret = new ArrayList<>(); + for (PrimitiveTypeOps typeOps : typeOpsList) { + ret.add(typeOps.create()); + } + return ret; + } + + @Override + public ReduceOperation<List<Object>, + KryoWritableWrapper<List<BasicArrayList>>> createReduceOperation() { + return new CollectTuplesOfPrimitivesReduceOperation(typeOpsList); + } + + @Override + public List<BasicArrayList> createReduceResult( + MasterGlobalCommUsage master) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += reducers.get(i).getReducedValue(master).get().get(0).size(); + } + return createLists(size); + } + + public List<BasicArrayList> createLists(int size) { + List<BasicArrayList> ret = new ArrayList<>(); + for (PrimitiveTypeOps typeOps : typeOpsList) { + ret.add(typeOps.createArrayList(size)); + } + return ret; + } + + @Override + public BroadcastHandle<List<BasicArrayList>> createBroadcastHandle( + BroadcastArrayHandle<KryoWritableWrapper<List<BasicArrayList>>> + broadcasts) { + return new CollectShardedTuplesOfPrimitivesBroadcastHandle(broadcasts); + } + + /** + * BroadcastHandle for CollectShardedTuplesOfPrimitivesReducerHandle + */ + public class CollectShardedTuplesOfPrimitivesBroadcastHandle + extends ShardedBroadcastHandle { + public CollectShardedTuplesOfPrimitivesBroadcastHandle( + BroadcastArrayHandle<KryoWritableWrapper<List<BasicArrayList>>> + broadcasts) { + super(broadcasts); + } + + @Override + public List<BasicArrayList> createBroadcastResult( + WorkerBroadcastUsage worker) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += broadcasts.get(i).getBroadcast(worker).get().size(); + } + return createLists(size); + } + } + + /** + * Reduce broadcast wrapper + */ + public static class CollectShardedTuplesOfPrimitivesReduceBroadcast { + private CollectShardedTuplesOfPrimitivesReducerHandle reducerHandle; + private BroadcastHandle<List<BasicArrayList>> broadcastHandle; + + /** Set reducer handle to just registered handle */ + public void registeredReducer(CreateReducersApi reduceApi, + Class<?>... valueClasses) { + this.reducerHandle = new CollectShardedTuplesOfPrimitivesReducerHandle( + reduceApi, valueClasses); + } + + public List<Object> createSingleValue() { + return reducerHandle.createSingleValue(); + } + + /** Reduce single value */ + public void reduce(List<Object> valueToReduce) { + reducerHandle.reduce(valueToReduce); + } + + /** Get reduced value */ + public List<BasicArrayList> getReducedValue(MasterGlobalCommUsage master) { + return reducerHandle.getReducedValue(master); + } + + /** + * Broadcast reduced value from master + */ + public void broadcastValue(BlockMasterApi master) { + broadcastHandle = reducerHandle.broadcastValue(master); + } + + /** Get broadcasted value */ + public List<BasicArrayList> getBroadcast(WorkerBroadcastUsage worker) { + return broadcastHandle.getBroadcast(worker); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java new file mode 100644 index 0000000..afaba7a --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.types.ops.collections.ResettableIterator; +import org.apache.giraph.utils.WritableUtils; + +/** + * Collect tuples of primitive values reduce operation + */ +public class CollectTuplesOfPrimitivesReduceOperation + extends KryoWrappedReduceOperation<List<Object>, List<BasicArrayList>> { + /** + * Type ops if available, or null + */ + private List<PrimitiveTypeOps> typeOpsList; + + /** For reflection only */ + public CollectTuplesOfPrimitivesReduceOperation() { + } + + public CollectTuplesOfPrimitivesReduceOperation( + List<PrimitiveTypeOps> typeOpsList) { + this.typeOpsList = typeOpsList; + } + + @Override + public List<BasicArrayList> createValue() { + List<BasicArrayList> ret = new ArrayList<>(typeOpsList.size()); + for (PrimitiveTypeOps typeOps : typeOpsList) { + ret.add(typeOps.createArrayList()); + } + return ret; + } + + @Override + public void reduce(List<BasicArrayList> reduceInto, List<Object> value) { + for (int i = 0; i < reduceInto.size(); i++) { + reduceInto.get(i).add(value.get(i)); + } + } + + @Override + public void reduceMerge(List<BasicArrayList> reduceInto, + List<BasicArrayList> toReduce) { + for (int i = 0; i < reduceInto.size(); i++) { + ResettableIterator iterator = toReduce.get(i).fastIterator(); + while (iterator.hasNext()) { + reduceInto.get(i).add(iterator.next()); + } + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(typeOpsList.size()); + for (PrimitiveTypeOps typeOps : typeOpsList) { + WritableUtils.writeClass(typeOps.getTypeClass(), out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + typeOpsList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps( + WritableUtils.readClass(in))); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java new file mode 100644 index 0000000..0c17216 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.block_app.reducers.array.ArrayOfHandles; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; +import org.apache.giraph.writable.kryo.TransientRandom; + +/** + * Reducing values into a list of reducers, randomly, + * and getting the results of all reducers together + * + * @param <S> Single value type + * @param <R> Reduced value type + */ +public abstract class ShardedReducerHandle<S, R> + implements ReducerHandle<S, R> { + // Use a prime number for number of reducers, large enough to make sure + // request sizes are within expected size (0.5MB) + protected static final int REDUCER_COUNT = 39989; + + protected final TransientRandom random = new TransientRandom(); + + protected ArrayOfHandles.ArrayOfReducers<S, KryoWritableWrapper<R>> reducers; + + public final void register(final CreateReducersApi reduceApi) { + reducers = new ArrayOfHandles.ArrayOfReducers<>(REDUCER_COUNT, + new Supplier<ReducerHandle<S, KryoWritableWrapper<R>>>() { + @Override + public ReducerHandle<S, KryoWritableWrapper<R>> get() { + return reduceApi.createLocalReducer(createReduceOperation()); + } + }); + } + + @Override + public final void reduce(S value) { + reducers.get(random.nextInt(REDUCER_COUNT)).reduce(value); + } + + @Override + public final R getReducedValue(MasterGlobalCommUsage master) { + KryoWritableWrapper<R> ret = new KryoWritableWrapper<>( + createReduceResult(master)); + ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation = + createReduceOperation(); + for (int i = 0; i < REDUCER_COUNT; i++) { + reduceOperation.reduceMerge(ret, + reducers.get(i).getReducedValue(master)); + } + return ret.get(); + } + + public abstract ReduceOperation<S, KryoWritableWrapper<R>> + createReduceOperation(); + + public R createReduceResult(MasterGlobalCommUsage master) { + return createReduceOperation().createInitialValue().get(); + } + + public BroadcastHandle<R> createBroadcastHandle( + BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) { + return new ShardedBroadcastHandle(broadcasts); + } + + @Override + public final BroadcastHandle<R> broadcastValue(BlockMasterApi masterApi) { + return createBroadcastHandle(reducers.broadcastValue(masterApi)); + } + + /** + * Broadcast for ShardedReducerHandle + */ + public class ShardedBroadcastHandle implements BroadcastHandle<R> { + protected final BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts; + + public ShardedBroadcastHandle( + BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) { + this.broadcasts = broadcasts; + } + + public R createBroadcastResult(WorkerBroadcastUsage worker) { + return createReduceOperation().createInitialValue().get(); + } + + @Override + public final R getBroadcast(WorkerBroadcastUsage worker) { + KryoWritableWrapper<R> ret = new KryoWritableWrapper<>( + createBroadcastResult(worker)); + ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation = + createReduceOperation(); + for (int i = 0; i < REDUCER_COUNT; i++) { + reduceOperation.reduceMerge(ret, + broadcasts.get(i).getBroadcast(worker)); + } + return ret.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java new file mode 100644 index 0000000..dc640f7 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Reducers for distributed collection of objects. + */ +package org.apache.giraph.block_app.reducers.collect; http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java new file mode 100644 index 0000000..0e1e113 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.map; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.map.BroadcastMapHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.map.ReducerMapHandle; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.WritableWriter; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +/** + * Efficient generic primitive map of values reduce operation. + * (it is BasicMap Reduce, not to be confused with MapReduce) + * + * @param <K> Key type + * @param <S> Single value type + * @param <R> Reduced value type + */ +public class BasicMapReduce<K extends WritableComparable, S, + R extends Writable> + implements ReduceOperation<Pair<K, S>, Basic2ObjectMap<K, R>> { + private PrimitiveIdTypeOps<K> keyTypeOps; + private PrimitiveTypeOps<R> typeOps; + private ReduceOperation<S, R> elementReduceOp; + private WritableWriter<R> writer; + + public BasicMapReduce() { + } + + /** + * Create ReduceOperation that reduces BasicMaps by reducing individual + * elements corresponding to the same key. + * + * @param keyTypeOps TypeOps of keys + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + */ + public BasicMapReduce( + PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps, + ReduceOperation<S, R> elementReduceOp) { + this.keyTypeOps = keyTypeOps; + this.typeOps = typeOps; + this.elementReduceOp = elementReduceOp; + init(); + } + + /** + * Registers one new local reducer, that will reduce BasicMap, + * by reducing individual elements corresponding to the same key + * using {@code elementReduceOp}. + * + * This function will return ReducerMapHandle, by which + * individual elements can be manipulated separately. + * + * @param keyTypeOps TypeOps of keys + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param reduceApi API for creating reducers + * @return Created ReducerMapHandle + */ + public static <K extends WritableComparable, S, R extends Writable> + ReducerMapHandle<K, S, R> createLocalMapHandles( + PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps, + ReduceOperation<S, R> elementReduceOp, + final CreateReducersApi reduceApi) { + return createMapHandles( + keyTypeOps, typeOps, elementReduceOp, + new CreateReducerFunctionApi() { + @Override + public <S, R extends Writable> ReducerHandle<S, R> createReducer( + ReduceOperation<S, R> reduceOp) { + return reduceApi.createLocalReducer(reduceOp); + } + }); + } + + /** + * Registers one new reducer, that will reduce BasicMap, + * by reducing individual elements corresponding to the same key + * using {@code elementReduceOp}. + * + * This function will return ReducerMapHandle, by which + * individual elements can be manipulated separately. + * + * @param keyTypeOps TypeOps of keys + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param createFunction Function for creating a reducer + * @return Created ReducerMapHandle + */ + public static <K extends WritableComparable, S, R extends Writable> + ReducerMapHandle<K, S, R> createMapHandles( + final PrimitiveIdTypeOps<K> keyTypeOps, final PrimitiveTypeOps<R> typeOps, + ReduceOperation<S, R> elementReduceOp, + CreateReducerFunctionApi createFunction) { + final ReducerHandle<Pair<K, S>, Basic2ObjectMap<K, R>> reduceHandle = + createFunction.createReducer( + new BasicMapReduce<>(keyTypeOps, typeOps, elementReduceOp)); + final K curIndex = keyTypeOps.create(); + final R reusableValue = typeOps.create(); + final R initialValue = elementReduceOp.createInitialValue(); + final MutablePair<K, S> reusablePair = MutablePair.of(null, null); + final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() { + @Override + public R getReducedValue(MasterGlobalCommUsage master) { + Basic2ObjectMap<K, R> result = reduceHandle.getReducedValue(master); + R value = result.get(curIndex); + if (value == null) { + typeOps.set(reusableValue, initialValue); + } else { + typeOps.set(reusableValue, value); + } + return reusableValue; + } + + @Override + public void reduce(S valueToReduce) { + reusablePair.setLeft(curIndex); + reusablePair.setRight(valueToReduce); + reduceHandle.reduce(reusablePair); + } + + @Override + public BroadcastHandle<R> broadcastValue(BlockMasterApi master) { + throw new UnsupportedOperationException(); + } + }; + + return new ReducerMapHandle<K, S, R>() { + @Override + public ReducerHandle<S, R> get(K key) { + keyTypeOps.set(curIndex, key); + return elementReduceHandle; + } + + @Override + public int getReducedSize(BlockMasterApi master) { + return reduceHandle.getReducedValue(master).size(); + } + + @Override + public BroadcastMapHandle<K, R> broadcastValue(BlockMasterApi master) { + final BroadcastHandle<Basic2ObjectMap<K, R>> broadcastHandle = + reduceHandle.broadcastValue(master); + final K curIndex = keyTypeOps.create(); + final R reusableValue = typeOps.create(); + final BroadcastHandle<R> + elementBroadcastHandle = new BroadcastHandle<R>() { + @Override + public R getBroadcast(WorkerBroadcastUsage worker) { + Basic2ObjectMap<K, R> result = broadcastHandle.getBroadcast(worker); + R value = result.get(curIndex); + if (value == null) { + typeOps.set(reusableValue, initialValue); + } else { + typeOps.set(reusableValue, value); + } + return reusableValue; + } + }; + return new BroadcastMapHandle<K, R>() { + @Override + public BroadcastHandle<R> get(K key) { + keyTypeOps.set(curIndex, key); + return elementBroadcastHandle; + } + + @Override + public int getBroadcastedSize(WorkerBroadcastUsage worker) { + return broadcastHandle.getBroadcast(worker).size(); + } + }; + } + }; + } + + private void init() { + writer = new WritableWriter<R>() { + @Override + public void write(DataOutput out, R value) throws IOException { + value.write(out); + } + + @Override + public R readFields(DataInput in) throws IOException { + R result = typeOps.create(); + result.readFields(in); + return result; + } + }; + } + + @Override + public Basic2ObjectMap<K, R> createInitialValue() { + return keyTypeOps.create2ObjectOpenHashMap(writer); + } + + @Override + public Basic2ObjectMap<K, R> reduce( + Basic2ObjectMap<K, R> curValue, Pair<K, S> valueToReduce) { + R result = curValue.get(valueToReduce.getLeft()); + if (result == null) { + result = typeOps.create(); + } + result = elementReduceOp.reduce(result, valueToReduce.getRight()); + curValue.put(valueToReduce.getLeft(), result); + return curValue; + } + + @Override + public Basic2ObjectMap<K, R> reduceMerge( + Basic2ObjectMap<K, R> curValue, Basic2ObjectMap<K, R> valueToReduce) { + for (Iterator<K> iter = valueToReduce.fastKeyIterator(); iter.hasNext();) { + K key = iter.next(); + + R result = curValue.get(key); + if (result == null) { + result = typeOps.create(); + } + result = elementReduceOp.reduceMerge(result, valueToReduce.get(key)); + curValue.put(key, result); + } + return curValue; + } + + @Override + public void write(DataOutput out) throws IOException { + TypeOpsUtils.writeTypeOps(keyTypeOps, out); + TypeOpsUtils.writeTypeOps(typeOps, out); + WritableUtils.writeWritableObject(elementReduceOp, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + keyTypeOps = TypeOpsUtils.readTypeOps(in); + typeOps = TypeOpsUtils.readTypeOps(in); + elementReduceOp = WritableUtils.readWritableObject(in, null); + init(); + } +}
