Updated Branches: refs/heads/trunk 507959dcb -> c33ea10c4
Add missing files from GIRAPH-535 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c33ea10c Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c33ea10c Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c33ea10c Branch: refs/heads/trunk Commit: c33ea10c40d0712352171b8f086dd4fcb70a43c3 Parents: 507959d Author: Alessandro Presta <[email protected]> Authored: Mon Feb 25 16:14:20 2013 -0800 Committer: Alessandro Presta <[email protected]> Committed: Mon Feb 25 16:14:20 2013 -0800 ---------------------------------------------------------------------- .../formats/PseudoRandomInputFormatConstants.java | 39 +++++ .../io/formats/PseudoRandomLocalEdgesHelper.java | 96 ++++++++++++ .../SimpleIntRangePartitionerFactory.java | 78 ++++++++++ .../SimpleLongRangePartitionerFactory.java | 78 ++++++++++ .../partition/SimpleRangeMasterPartitioner.java | 117 +++++++++++++++ .../partition/SimpleRangeWorkerPartitioner.java | 109 ++++++++++++++ 6 files changed, 517 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomInputFormatConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomInputFormatConstants.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomInputFormatConstants.java new file mode 100644 index 0000000..3497de4 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomInputFormatConstants.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +/** + * Contains constants for configuring pseudo-random input formats. + */ +public class PseudoRandomInputFormatConstants { + /** Set the number of aggregate vertices. */ + public static final String AGGREGATE_VERTICES = + "giraph.pseudoRandomInputFormat.aggregateVertices"; + /** Set the number of edges per vertex (pseudo-random destination). */ + public static final String EDGES_PER_VERTEX = + "giraph.pseudoRandomInputFormat.edgesPerVertex"; + /** Minimum ratio of partition-local edges. */ + public static final String LOCAL_EDGES_MIN_RATIO = + "giraph.pseudoRandomInputFormat.localEdgesMinRatio"; + /** Default minimum ratio of partition-local edges. */ + public static final float LOCAL_EDGES_MIN_RATIO_DEFAULT = 0; + + /** Do not construct. */ + private PseudoRandomInputFormatConstants() { } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java new file mode 100644 index 0000000..84502e1 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.partition.PartitionUtils; +import org.apache.giraph.partition.SimpleLongRangePartitionerFactory; +import org.apache.giraph.worker.WorkerInfo; + +import java.util.Collections; +import java.util.List; +import java.util.Random; + +/** + * Helper class to generate pseudo-random local edges. + */ +public class PseudoRandomLocalEdgesHelper { + /** Minimum ratio of partition-local edges. */ + private float minLocalEdgesRatio; + /** Whether we're using range-partitioning or hash-partitioning */ + private boolean usingRangePartitioner; + /** Total number of vertices. */ + private long numVertices; + /** Total number of partitions. */ + private int numPartitions; + /** Average partition size. */ + private long partitionSize; + + /** + * Constructor. + * + * @param numVertices Total number of vertices. + * @param minLocalEdgesRatio Minimum ratio of local edges. + * @param conf Configuration. + */ + public PseudoRandomLocalEdgesHelper(long numVertices, + float minLocalEdgesRatio, + ImmutableClassesGiraphConfiguration conf) + { + this.minLocalEdgesRatio = minLocalEdgesRatio; + this.numVertices = numVertices; + usingRangePartitioner = + SimpleLongRangePartitionerFactory.class.isAssignableFrom( + conf.getGraphPartitionerClass()); + int numWorkers = conf.getMaxWorkers(); + List<WorkerInfo> workerInfos = Collections.nCopies(numWorkers, + new WorkerInfo()); + numPartitions = PartitionUtils.computePartitionCount(workerInfos, + numWorkers, conf); + partitionSize = numVertices / numPartitions; + } + + /** + * Generate a destination vertex id for the given source vertex, + * using the desired configuration for edge locality and the provided + * pseudo-random generator. + * + * @param sourceVertexId Source vertex id. + * @param rand Pseudo-random generator. + * @return Destination vertex id. + */ + public long generateDestVertex(long sourceVertexId, Random rand) { + long destVertexId; + if (rand.nextFloat() < minLocalEdgesRatio) { + if (usingRangePartitioner) { + int partitionId = Math.min(numPartitions - 1, + (int) (sourceVertexId / partitionSize)); + destVertexId = partitionId * partitionSize + + (Math.abs(rand.nextLong()) % partitionSize); + } else { + int partitionId = (int) sourceVertexId % numPartitions; + destVertexId = partitionId + + numPartitions * (Math.abs(rand.nextLong()) % partitionSize); + } + } else { + destVertexId = Math.abs(rand.nextLong()) % numVertices; + } + return destVertexId; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java new file mode 100644 index 0000000..9ac2e11 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; + +/** + * Factory for simple range-based partitioners based on integer vertex ids. + * Workers are assigned equal-sized ranges of partitions, + * and partitions are assigned equal-sized ranges of vertices. + * + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <M> Message data type + */ +public class SimpleIntRangePartitionerFactory<V extends Writable, + E extends Writable, M extends Writable> + implements GraphPartitionerFactory<IntWritable, V, E, M> { + /** Configuration. */ + private ImmutableClassesGiraphConfiguration conf; + /** Vertex key space size. */ + private long keySpaceSize; + + @Override + public MasterGraphPartitioner<IntWritable, V, E, M> + createMasterGraphPartitioner() { + return new SimpleRangeMasterPartitioner<IntWritable, V, E, M>(conf); + } + + @Override + public WorkerGraphPartitioner<IntWritable, V, E, M> + createWorkerGraphPartitioner() { + return new SimpleRangeWorkerPartitioner<IntWritable, V, E, M>( + keySpaceSize) { + @Override + protected long vertexKeyFromId(IntWritable id) { + // The modulo is just a safeguard in case keySpaceSize is incorrect. + return id.get() % keySpaceSize; + } + }; + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; + keySpaceSize = conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, + -1); + if (keySpaceSize == -1) { + throw new IllegalStateException("Need to specify " + GiraphConstants + .PARTITION_VERTEX_KEY_SPACE_SIZE + " when using " + + "SimpleRangePartitioner"); + } + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java new file mode 100644 index 0000000..5772a7b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +/** + * Factory for simple range-based partitioners based on long vertex ids. + * Workers are assigned equal-sized ranges of partitions, + * and partitions are assigned equal-sized ranges of vertices. + * + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <M> Message data type + */ +public class SimpleLongRangePartitionerFactory<V extends Writable, + E extends Writable, M extends Writable> + implements GraphPartitionerFactory<LongWritable, V, E, M> { + /** Configuration. */ + private ImmutableClassesGiraphConfiguration conf; + /** Vertex key space size. */ + private long keySpaceSize; + + @Override + public MasterGraphPartitioner<LongWritable, V, E, M> + createMasterGraphPartitioner() { + return new SimpleRangeMasterPartitioner<LongWritable, V, E, M>(conf); + } + + @Override + public WorkerGraphPartitioner<LongWritable, V, E, M> + createWorkerGraphPartitioner() { + return new SimpleRangeWorkerPartitioner<LongWritable, V, E, M>( + keySpaceSize) { + @Override + protected long vertexKeyFromId(LongWritable id) { + // The modulo is just a safeguard in case keySpaceSize is incorrect. + return id.get() % keySpaceSize; + } + }; + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; + keySpaceSize = conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, + -1); + if (keySpaceSize == -1) { + throw new IllegalStateException("Need to specify " + GiraphConstants + .PARTITION_VERTEX_KEY_SPACE_SIZE + " when using " + + "SimpleRangePartitioner"); + } + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java new file mode 100644 index 0000000..bf34ecd --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * A range-based master partitioner where equal-sized ranges of partitions + * are deterministically assigned to workers. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <M> Message data type + */ +public class SimpleRangeMasterPartitioner<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> implements + MasterGraphPartitioner<I, V, E, M> { + /** Class logger */ + private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class); + /** Provided configuration */ + private ImmutableClassesGiraphConfiguration conf; + /** Save the last generated partition owner list */ + private List<PartitionOwner> partitionOwnerList; + + /** + * Constructor. + * + * @param conf Configuration used. + */ + public SimpleRangeMasterPartitioner( + ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; + } + + @Override + public Collection<PartitionOwner> createInitialPartitionOwners( + Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) { + int partitionCount = PartitionUtils.computePartitionCount( + availableWorkerInfos, maxWorkers, conf); + int rangeSize = partitionCount / availableWorkerInfos.size(); + + partitionOwnerList = new ArrayList<PartitionOwner>(); + Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator(); + WorkerInfo currentWorker = null; + + int i = 0; + for (; i < partitionCount; ++i) { + if (i % rangeSize == 0) { + if (!workerIt.hasNext()) { + break; + } + currentWorker = workerIt.next(); + } + partitionOwnerList.add(new BasicPartitionOwner(i, currentWorker)); + } + + // Distribute the remainder among all workers. + if (i < partitionCount) { + workerIt = availableWorkerInfos.iterator(); + for (; i < partitionCount; ++i) { + partitionOwnerList.add(new BasicPartitionOwner(i, workerIt.next())); + } + } + + return partitionOwnerList; + } + + @Override + public Collection<PartitionOwner> generateChangedPartitionOwners( + Collection<PartitionStats> allPartitionStatsList, + Collection<WorkerInfo> availableWorkers, + int maxWorkers, + long superstep) { + return PartitionBalancer.balancePartitionsAcrossWorkers( + conf, + partitionOwnerList, + allPartitionStatsList, + availableWorkers); + } + + @Override + public Collection<PartitionOwner> getCurrentPartitionOwners() { + return partitionOwnerList; + } + + @Override + public PartitionStats createPartitionStats() { + return new PartitionStats(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java new file mode 100644 index 0000000..f94c14b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import com.google.common.collect.Lists; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.util.Collection; +import java.util.List; + +/** + * A range-based worker partitioner where equal-sized ranges of vertex ids + * are deterministically assigned to partitions. + * The user has to define a mapping from vertex ids to long keys dense in + * [0, keySpaceSize). + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <M> Message data type + */ +public abstract class SimpleRangeWorkerPartitioner<I extends + WritableComparable, V extends Writable, E extends Writable, + M extends Writable> + implements WorkerGraphPartitioner<I, V, E, M> { + /** List of {@link PartitionOwner}s for this worker. */ + private List<PartitionOwner> partitionOwnerList = Lists.newArrayList(); + /** Vertex keys space size. */ + private long keySpaceSize; + + /** + * Constructor. + * + * @param keySpaceSize Vertex keys space size. + */ + public SimpleRangeWorkerPartitioner(long keySpaceSize) { + this.keySpaceSize = keySpaceSize; + } + + /** + * Get key space size (can be used when implementing vertexKeyFromId()). + * + * @return Key space size. + */ + public long getKeySpaceSize() { + return keySpaceSize; + } + + /** + * Convert a vertex id to a unique long key in [0, keySpaceSize]. + * + * @param id Vertex id + * @return Unique long key + */ + protected abstract long vertexKeyFromId(I id); + + @Override + public PartitionOwner createPartitionOwner() { + return new BasicPartitionOwner(); + } + + @Override + public PartitionOwner getPartitionOwner(I vertexId) { + long rangeSize = keySpaceSize / partitionOwnerList.size(); + return partitionOwnerList.get( + Math.min((int) (vertexKeyFromId(vertexId) / rangeSize), + partitionOwnerList.size() - 1)); + } + + @Override + public Collection<PartitionStats> finalizePartitionStats( + Collection<PartitionStats> workerPartitionStats, + PartitionStore<I, V, E, M> partitionStore) { + // No modification necessary + return workerPartitionStats; + } + + @Override + public PartitionExchange updatePartitionOwners( + WorkerInfo myWorkerInfo, + Collection<? extends PartitionOwner> masterSetPartitionOwners, + PartitionStore<I, V, E, M> partitionStore) { + return PartitionBalancer.updatePartitionOwners(partitionOwnerList, + myWorkerInfo, masterSetPartitionOwners, partitionStore); + } + + @Override + public Collection<? extends PartitionOwner> getPartitionOwners() { + return partitionOwnerList; + } +}
