Repository: giraph Updated Branches: refs/heads/trunk 1e802da3a -> ca36f1d49
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java deleted file mode 100644 index 0ee8d92..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.google.common.collect.Lists; -import org.apache.log4j.Logger; - -/** - * Abstracts and implements all WorkerGraphPartitioner logic on top of a single - * user function - getPartitionIndex. - * - * @param <I> Vertex id type - * @param <V> Vertex value type - * @param <E> Edge value type - */ -public abstract class SimpleWorkerPartitioner<I extends WritableComparable, - V extends Writable, E extends Writable> - implements WorkerGraphPartitioner<I, V, E> { - /** Logger instance */ - private static final Logger LOG = Logger.getLogger( - SimpleWorkerPartitioner.class); - /** List of {@link PartitionOwner}s for this worker. */ - private List<PartitionOwner> partitionOwnerList = Lists.newArrayList(); - /** List of available workers */ - private Set<WorkerInfo> availableWorkers = new HashSet<>(); - - @Override - public PartitionOwner createPartitionOwner() { - return new BasicPartitionOwner(); - } - - @Override - public PartitionOwner getPartitionOwner(I vertexId) { - return partitionOwnerList.get( - getPartitionIndex(vertexId, partitionOwnerList.size(), - availableWorkers.size())); - } - - @Override - public Collection<PartitionStats> finalizePartitionStats( - Collection<PartitionStats> workerPartitionStats, - PartitionStore<I, V, E> partitionStore) { - // No modification necessary - return workerPartitionStats; - } - - @Override - public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo, - Collection<? extends PartitionOwner> masterSetPartitionOwners) { - PartitionExchange exchange = PartitionBalancer.updatePartitionOwners( - partitionOwnerList, myWorkerInfo, masterSetPartitionOwners); - extractAvailableWorkers(); - return exchange; - } - - @Override - public Collection<? extends PartitionOwner> getPartitionOwners() { - return partitionOwnerList; - } - - /** - * Update availableWorkers - */ - public void extractAvailableWorkers() { - availableWorkers.clear(); - for (PartitionOwner partitionOwner : partitionOwnerList) { - availableWorkers.add(partitionOwner.getWorkerInfo()); - } - LOG.info("After updating partitionOwnerList " + availableWorkers.size() + - " workers are available"); - } - - /** - * Calculates in which partition current vertex belongs to, - * from interval [0, partitionCount). - * - * @param id Vertex id - * @param partitionCount Number of partitions - * @param workerCount Number of active workers - * @return partition - */ - protected abstract int getPartitionIndex(I id, int partitionCount, - int workerCount); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java new file mode 100644 index 0000000..2087181 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.collect.Lists; +import org.apache.log4j.Logger; + +/** + * Abstracts and implements all WorkerGraphPartitioner logic on top of a single + * user function - getPartitionIndex. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +public abstract class WorkerGraphPartitionerImpl<I extends WritableComparable, + V extends Writable, E extends Writable> + implements WorkerGraphPartitioner<I, V, E> { + /** Logger instance */ + private static final Logger LOG = Logger.getLogger( + WorkerGraphPartitionerImpl.class); + /** List of {@link PartitionOwner}s for this worker. */ + private List<PartitionOwner> partitionOwnerList = Lists.newArrayList(); + /** List of available workers */ + private Set<WorkerInfo> availableWorkers = new HashSet<>(); + + @Override + public PartitionOwner createPartitionOwner() { + return new BasicPartitionOwner(); + } + + @Override + public PartitionOwner getPartitionOwner(I vertexId) { + return partitionOwnerList.get( + getPartitionIndex(vertexId, partitionOwnerList.size(), + availableWorkers.size())); + } + + @Override + public Collection<PartitionStats> finalizePartitionStats( + Collection<PartitionStats> workerPartitionStats, + PartitionStore<I, V, E> partitionStore) { + // No modification necessary + return workerPartitionStats; + } + + @Override + public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo, + Collection<? extends PartitionOwner> masterSetPartitionOwners) { + PartitionExchange exchange = PartitionBalancer.updatePartitionOwners( + partitionOwnerList, myWorkerInfo, masterSetPartitionOwners); + extractAvailableWorkers(); + return exchange; + } + + @Override + public Collection<? extends PartitionOwner> getPartitionOwners() { + return partitionOwnerList; + } + + /** + * Update availableWorkers + */ + public void extractAvailableWorkers() { + availableWorkers.clear(); + for (PartitionOwner partitionOwner : partitionOwnerList) { + availableWorkers.add(partitionOwner.getWorkerInfo()); + } + LOG.info("After updating partitionOwnerList " + availableWorkers.size() + + " workers are available"); + } + + /** + * Calculates in which partition current vertex belongs to, + * from interval [0, partitionCount). + * + * @param id Vertex id + * @param partitionCount Number of partitions + * @param workerCount Number of active workers + * @return partition + */ + protected abstract int getPartitionIndex(I id, int partitionCount, + int workerCount); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java index 478a33b..49602a1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java @@ -41,6 +41,7 @@ import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.log4j.Logger; import org.objenesis.strategy.StdInstantiatorStrategy; import com.esotericsoftware.kryo.Kryo; @@ -106,6 +107,9 @@ public class HadoopKryo extends Kryo { Random.class, "it should be rarely serialized, since it would create same stream " + "of numbers everywhere, use TransientRandom instead"); + NON_SERIALIZABLE.put( + Logger.class, + "Logger must be a static field"); } // Use chunked streams, so within same stream we can use both kryo and http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java index bf87491..a3c95bb 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java @@ -18,13 +18,17 @@ package org.apache.giraph; +import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.examples.GeneratedVertexReader; import org.apache.giraph.examples.SimpleCheckpoint; import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat; import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat; -import org.apache.giraph.integration.SuperstepHashPartitionerFactory; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.partition.HashRangePartitionerFactory; import org.apache.giraph.partition.PartitionBalancer; @@ -34,12 +38,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; -import java.io.IOException; - -import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - /** * Unit test for manual checkpoint restarting */ @@ -108,24 +106,6 @@ public class TestGraphPartitioner extends BspCase { assertTrue(job.run(true)); verifyOutput(hdfs, outputPath); - outputPath = getTempPath("testSuperstepHashPartitioner"); - conf = new GiraphConfiguration(); - conf.setComputationClass( - SimpleCheckpoint.SimpleCheckpointComputation.class); - conf.setWorkerContextClass( - SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class); - conf.setMasterComputeClass( - SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class); - conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); - job = prepareJob("testSuperstepHashPartitioner", conf, outputPath); - - job.getConfiguration().setGraphPartitionerFactoryClass( - SuperstepHashPartitionerFactory.class); - - assertTrue(job.run(true)); - verifyOutput(hdfs, outputPath); - job = new GiraphJob("testHashRangePartitioner"); setupConfiguration(job); job.getConfiguration().setComputationClass( @@ -145,24 +125,6 @@ public class TestGraphPartitioner extends BspCase { assertTrue(job.run(true)); verifyOutput(hdfs, outputPath); - outputPath = getTempPath("testReverseIdSuperstepHashPartitioner"); - conf = new GiraphConfiguration(); - conf.setComputationClass( - SimpleCheckpoint.SimpleCheckpointComputation.class); - conf.setWorkerContextClass( - SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class); - conf.setMasterComputeClass( - SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class); - conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); - job = prepareJob("testReverseIdSuperstepHashPartitioner", conf, - outputPath); - job.getConfiguration().setGraphPartitionerFactoryClass( - SuperstepHashPartitionerFactory.class); - GeneratedVertexReader.REVERSE_ID_ORDER.set(job.getConfiguration(), true); - assertTrue(job.run(true)); - verifyOutput(hdfs, outputPath); - job = new GiraphJob("testSimpleRangePartitioner"); setupConfiguration(job); job.getConfiguration().setComputationClass(
