Repository: giraph
Updated Branches:
  refs/heads/trunk 1e802da3a -> ca36f1d49


http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
 
b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
deleted file mode 100644
index 0ee8d92..0000000
--- 
a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.collect.Lists;
-import org.apache.log4j.Logger;
-
-/**
- * Abstracts and implements all WorkerGraphPartitioner logic on top of a single
- * user function - getPartitionIndex.
- *
- * @param <I> Vertex id type
- * @param <V> Vertex value type
- * @param <E> Edge value type
- */
-public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements WorkerGraphPartitioner<I, V, E> {
-  /** Logger instance */
-  private static final Logger LOG = Logger.getLogger(
-      SimpleWorkerPartitioner.class);
-  /** List of {@link PartitionOwner}s for this worker. */
-  private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
-  /** List of available workers */
-  private Set<WorkerInfo> availableWorkers = new HashSet<>();
-
-  @Override
-  public PartitionOwner createPartitionOwner() {
-    return new BasicPartitionOwner();
-  }
-
-  @Override
-  public PartitionOwner getPartitionOwner(I vertexId) {
-    return partitionOwnerList.get(
-        getPartitionIndex(vertexId, partitionOwnerList.size(),
-            availableWorkers.size()));
-  }
-
-  @Override
-  public Collection<PartitionStats> finalizePartitionStats(
-      Collection<PartitionStats> workerPartitionStats,
-      PartitionStore<I, V, E> partitionStore) {
-    // No modification necessary
-    return workerPartitionStats;
-  }
-
-  @Override
-  public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
-      Collection<? extends PartitionOwner> masterSetPartitionOwners) {
-    PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
-        partitionOwnerList, myWorkerInfo, masterSetPartitionOwners);
-    extractAvailableWorkers();
-    return exchange;
-  }
-
-  @Override
-  public Collection<? extends PartitionOwner> getPartitionOwners() {
-    return partitionOwnerList;
-  }
-
-  /**
-   * Update availableWorkers
-   */
-  public void extractAvailableWorkers() {
-    availableWorkers.clear();
-    for (PartitionOwner partitionOwner : partitionOwnerList) {
-      availableWorkers.add(partitionOwner.getWorkerInfo());
-    }
-    LOG.info("After updating partitionOwnerList " + availableWorkers.size() +
-        " workers are available");
-  }
-
-  /**
-   * Calculates in which partition current vertex belongs to,
-   * from interval [0, partitionCount).
-   *
-   * @param id Vertex id
-   * @param partitionCount Number of partitions
-   * @param workerCount Number of active workers
-   * @return partition
-   */
-  protected abstract int getPartitionIndex(I id, int partitionCount,
-    int workerCount);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java
 
b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java
new file mode 100644
index 0000000..2087181
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+import org.apache.log4j.Logger;
+
+/**
+ * Abstracts and implements all WorkerGraphPartitioner logic on top of a single
+ * user function - getPartitionIndex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public abstract class WorkerGraphPartitionerImpl<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements WorkerGraphPartitioner<I, V, E> {
+  /** Logger instance */
+  private static final Logger LOG = Logger.getLogger(
+      WorkerGraphPartitionerImpl.class);
+  /** List of {@link PartitionOwner}s for this worker. */
+  private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
+  /** List of available workers */
+  private Set<WorkerInfo> availableWorkers = new HashSet<>();
+
+  @Override
+  public PartitionOwner createPartitionOwner() {
+    return new BasicPartitionOwner();
+  }
+
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    return partitionOwnerList.get(
+        getPartitionIndex(vertexId, partitionOwnerList.size(),
+            availableWorkers.size()));
+  }
+
+  @Override
+  public Collection<PartitionStats> finalizePartitionStats(
+      Collection<PartitionStats> workerPartitionStats,
+      PartitionStore<I, V, E> partitionStore) {
+    // No modification necessary
+    return workerPartitionStats;
+  }
+
+  @Override
+  public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
+      Collection<? extends PartitionOwner> masterSetPartitionOwners) {
+    PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
+        partitionOwnerList, myWorkerInfo, masterSetPartitionOwners);
+    extractAvailableWorkers();
+    return exchange;
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> getPartitionOwners() {
+    return partitionOwnerList;
+  }
+
+  /**
+   * Update availableWorkers
+   */
+  public void extractAvailableWorkers() {
+    availableWorkers.clear();
+    for (PartitionOwner partitionOwner : partitionOwnerList) {
+      availableWorkers.add(partitionOwner.getWorkerInfo());
+    }
+    LOG.info("After updating partitionOwnerList " + availableWorkers.size() +
+        " workers are available");
+  }
+
+  /**
+   * Calculates in which partition current vertex belongs to,
+   * from interval [0, partitionCount).
+   *
+   * @param id Vertex id
+   * @param partitionCount Number of partitions
+   * @param workerCount Number of active workers
+   * @return partition
+   */
+  protected abstract int getPartitionIndex(I id, int partitionCount,
+    int workerCount);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java 
b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
index 478a33b..49602a1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
@@ -41,6 +41,7 @@ import 
org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import com.esotericsoftware.kryo.Kryo;
@@ -106,6 +107,9 @@ public class HadoopKryo extends Kryo {
         Random.class,
         "it should be rarely serialized, since it would create same stream " +
         "of numbers everywhere, use TransientRandom instead");
+    NON_SERIALIZABLE.put(
+        Logger.class,
+        "Logger must be a static field");
   }
 
   // Use chunked streams, so within same stream we can use both kryo and

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git 
a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java 
b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index bf87491..a3c95bb 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -18,13 +18,17 @@
 
 package org.apache.giraph;
 
+import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.examples.GeneratedVertexReader;
 import org.apache.giraph.examples.SimpleCheckpoint;
 import 
org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
 import 
org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.partition.HashRangePartitionerFactory;
 import org.apache.giraph.partition.PartitionBalancer;
@@ -34,12 +38,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 
-import java.io.IOException;
-
-import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 /**
  * Unit test for manual checkpoint restarting
  */
@@ -108,24 +106,6 @@ public class TestGraphPartitioner extends BspCase {
     assertTrue(job.run(true));
     verifyOutput(hdfs, outputPath);
 
-    outputPath = getTempPath("testSuperstepHashPartitioner");
-    conf = new GiraphConfiguration();
-    conf.setComputationClass(
-        SimpleCheckpoint.SimpleCheckpointComputation.class);
-    conf.setWorkerContextClass(
-        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
-    conf.setMasterComputeClass(
-        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
-    conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-    conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
-    job = prepareJob("testSuperstepHashPartitioner", conf, outputPath);
-
-    job.getConfiguration().setGraphPartitionerFactoryClass(
-        SuperstepHashPartitionerFactory.class);
-
-    assertTrue(job.run(true));
-    verifyOutput(hdfs, outputPath);
-
     job = new GiraphJob("testHashRangePartitioner");
     setupConfiguration(job);
     job.getConfiguration().setComputationClass(
@@ -145,24 +125,6 @@ public class TestGraphPartitioner extends BspCase {
     assertTrue(job.run(true));
     verifyOutput(hdfs, outputPath);
 
-    outputPath = getTempPath("testReverseIdSuperstepHashPartitioner");
-    conf = new GiraphConfiguration();
-    conf.setComputationClass(
-        SimpleCheckpoint.SimpleCheckpointComputation.class);
-    conf.setWorkerContextClass(
-        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
-    conf.setMasterComputeClass(
-        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
-    conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-    conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
-    job = prepareJob("testReverseIdSuperstepHashPartitioner", conf,
-        outputPath);
-    job.getConfiguration().setGraphPartitionerFactoryClass(
-        SuperstepHashPartitionerFactory.class);
-    GeneratedVertexReader.REVERSE_ID_ORDER.set(job.getConfiguration(), true);
-    assertTrue(job.run(true));
-    verifyOutput(hdfs, outputPath);
-
     job = new GiraphJob("testSimpleRangePartitioner");
     setupConfiguration(job);
     job.getConfiguration().setComputationClass(

Reply via email to