Updated Branches:
  refs/heads/trunk 56fcb519a -> 2430ec5f2

GIRAPH-579


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2430ec5f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2430ec5f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2430ec5f

Branch: refs/heads/trunk
Commit: 2430ec5f231f572b780dc0cb0053f409358df155
Parents: 56fcb51
Author: Alessandro Presta <[email protected]>
Authored: Thu Mar 21 11:43:07 2013 -0700
Committer: Alessandro Presta <[email protected]>
Committed: Tue Mar 26 14:52:15 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    3 +
 .../java/org/apache/giraph/conf/GiraphClasses.java |   32 ++++-
 .../apache/giraph/conf/GiraphConfiguration.java    |   12 ++
 .../org/apache/giraph/conf/GiraphConstants.java    |    2 +
 .../conf/ImmutableClassesGiraphConfiguration.java  |   40 +++++
 .../java/org/apache/giraph/edge/EdgeStore.java     |  116 +++++++++++----
 .../giraph/job/GiraphConfigurationValidator.java   |   21 ++-
 .../apache/giraph/utils/ConfigurationUtils.java    |    5 +
 .../apache/giraph/utils/InternalVertexRunner.java  |    1 +
 .../java/org/apache/giraph/io/TestEdgeInput.java   |   62 +++++++-
 10 files changed, 256 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index b695ae0..674d15e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-579: Make it possible to use different out-edges data structures
+  for input and computation (apresta)
+
   GIRAPH-582: Create a generic option for determining the number of
   supersteps that a job runs for (aching)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index d67d3a5..c13f3a2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -71,6 +71,8 @@ public class GiraphClasses<I extends WritableComparable,
   protected Class<M> messageValueClass;
   /** Vertex edges class - cached for fast access */
   protected Class<? extends VertexEdges<I, E>> vertexEdgesClass;
+  /** Input vertex edges class - cached for fast access */
+  protected Class<? extends VertexEdges<I, E>> inputVertexEdgesClass;
 
   /** Vertex value factory class - cached for fast access */
   protected Class<? extends VertexValueFactory<V>> vertexValueFactoryClass;
@@ -114,6 +116,8 @@ public class GiraphClasses<I extends WritableComparable,
     // downcast.
     vertexEdgesClass = (Class<? extends VertexEdges<I, E>>) (Object)
         ByteArrayEdges.class;
+    inputVertexEdgesClass = (Class<? extends VertexEdges<I, E>>) (Object)
+        ByteArrayEdges.class;
     vertexValueFactoryClass = (Class<? extends VertexValueFactory<V>>) (Object)
         DefaultVertexValueFactory.class;
     graphPartitionerFactoryClass =
@@ -156,9 +160,12 @@ public class GiraphClasses<I extends WritableComparable,
     vertexEdgesClass = (Class<? extends VertexEdges<I, E>>)
         conf.getClass(VERTEX_EDGES_CLASS, ByteArrayEdges.class,
             VertexEdges.class);
+    inputVertexEdgesClass = (Class<? extends VertexEdges<I, E>>)
+        conf.getClass(INPUT_VERTEX_EDGES_CLASS, vertexEdgesClass,
+            VertexEdges.class);
     vertexValueFactoryClass = (Class<? extends VertexValueFactory<V>>)
         conf.getClass(VERTEX_VALUE_FACTORY_CLASS,
-            DefaultVertexValueFactory.class);
+            DefaultVertexValueFactory.class, VertexValueFactory.class);
 
     graphPartitionerFactoryClass =
         (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
@@ -248,6 +255,15 @@ public class GiraphClasses<I extends WritableComparable,
     return vertexEdgesClass;
   }
 
+  /* Get Vertex edges class used during edge-based input
+ *
+ * @return Vertex edges class.
+ */
+  public Class<? extends VertexEdges<I, E>> getInputVertexEdgesClass() {
+    return inputVertexEdgesClass;
+  }
+
+
   /**
    * Get vertex value factory class
    *
@@ -519,6 +535,20 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Set VertexEdges class used during edge-input (if different from the one
+   * used for computation)
+   *
+   * @param inputVertexEdgesClass Input vertex edges class to set
+   * @return this
+   */
+  public GiraphClasses setInputVertexEdgesClass(
+      Class<? extends VertexEdges> inputVertexEdgesClass) {
+    this.inputVertexEdgesClass =
+        (Class<? extends VertexEdges<I, E>>) inputVertexEdgesClass;
+    return this;
+  }
+
+  /**
    * Set VertexValueFactory class held
    *
    * @param vertexValueFactoryClass Vertex value factory class to set

http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 3b84831..ffcae6e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -97,6 +97,18 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Set the vertex edges class used during edge-based input (if different
+   * from the one used during computation)
+   *
+   * @param inputVertexEdgesClass Determines the way edges are stored
+   */
+  public final void setInputVertexEdgesClass(
+      Class<? extends VertexEdges> inputVertexEdgesClass) {
+    setClass(INPUT_VERTEX_EDGES_CLASS, inputVertexEdgesClass,
+        VertexEdges.class);
+  }
+
+  /**
    * Set the vertex input format class (required)
    *
    * @param vertexInputFormatClass Determines how graph is input

http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 42f8abc..7882d06 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -28,6 +28,8 @@ public interface GiraphConstants {
   String VERTEX_VALUE_FACTORY_CLASS = "giraph.vertexValueFactoryClass";
   /** Vertex edges class - optional */
   String VERTEX_EDGES_CLASS = "giraph.vertexEdgesClass";
+  /** Vertex edges class to be used during edge input only - optional */
+  String INPUT_VERTEX_EDGES_CLASS = "giraph.inputVertexEdgesClass";
 
   /** Class for Master - optional */
   String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";

http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
 
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 2a3466d..de85ab6 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -575,6 +575,25 @@ public class ImmutableClassesGiraphConfiguration<I extends 
WritableComparable,
   }
 
   /**
+   * Get the user's subclassed {@link VertexEdges} used for input
+   *
+   * @return User's input vertex edges class
+   */
+  public Class<? extends VertexEdges<I, E>> getInputVertexEdgesClass() {
+    return classes.getInputVertexEdgesClass();
+  }
+
+  /**
+   * Check whether the user has specified a different {@link VertexEdges}
+   * class to be used during edge-based input.
+   *
+   * @return True iff there is a special edges class for input
+   */
+  public boolean useInputVertexEdges() {
+    return classes.getInputVertexEdgesClass() != classes.getVertexEdgesClass();
+  }
+
+  /**
    * True if the {@link VertexEdges} implementation copies the passed edges
    * to its own data structure, i.e. it doesn't keep references to Edge
    * objects, target vertex ids or edge values passed to add() or
@@ -639,6 +658,27 @@ public class ImmutableClassesGiraphConfiguration<I extends 
WritableComparable,
   }
 
   /**
+   * Create a user {@link VertexEdges} used during edge-based input
+   *
+   * @return Instantiated user input VertexEdges
+   */
+  public VertexEdges<I, E> createInputVertexEdges() {
+    return ReflectionUtils.newInstance(getInputVertexEdgesClass(), this);
+  }
+
+  /**
+   * Create an input {@link VertexEdges} instance and initialize it with the
+   * default capacity.
+   *
+   * @return Instantiated input VertexEdges
+   */
+  public VertexEdges<I, E> createAndInitializeInputVertexEdges() {
+    VertexEdges<I, E> vertexEdges = createInputVertexEdges();
+    vertexEdges.initialize();
+    return vertexEdges;
+  }
+
+  /**
    * Create a partition
    *
    * @param id Partition id

http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 234c267..3101211 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -19,18 +19,24 @@
 package org.apache.giraph.edge;
 
 import com.google.common.collect.MapMaker;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
-import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Collects incoming edges for vertices owned by this worker.
@@ -58,6 +64,11 @@ public class EdgeStore<I extends WritableComparable,
    * reuse.
    */
   private boolean reuseEdgeObjects;
+  /**
+   * Whether the {@link VertexEdges} class used during input is different
+   * from the one used during computation.
+   */
+  private boolean useInputVertexEdges;
 
   /**
    * Constructor.
@@ -76,6 +87,7 @@ public class EdgeStore<I extends WritableComparable,
     transientEdges = new MapMaker().concurrencyLevel(
         configuration.getNettyServerExecutionConcurrency()).makeMap();
     reuseEdgeObjects = configuration.reuseEdgeObjects();
+    useInputVertexEdges = configuration.useInputVertexEdges();
   }
 
   /**
@@ -110,7 +122,7 @@ public class EdgeStore<I extends WritableComparable,
       VertexEdges<I, E> vertexEdges = partitionEdges.get(vertexId);
       if (vertexEdges == null) {
         VertexEdges<I, E> newVertexEdges =
-            configuration.createAndInitializeVertexEdges();
+            configuration.createAndInitializeInputVertexEdges();
         vertexEdges = partitionEdges.putIfAbsent(vertexId, newVertexEdges);
         if (vertexEdges == null) {
           vertexEdges = newVertexEdges;
@@ -126,6 +138,27 @@ public class EdgeStore<I extends WritableComparable,
   }
 
   /**
+   * Convert the input edges to the {@link VertexEdges} data structure used
+   * for computation (if different).
+   *
+   * @param inputEdges Input edges
+   * @return Compute edges
+   */
+  private VertexEdges<I, E> convertInputToComputeEdges(
+      VertexEdges<I, E> inputEdges) {
+    if (!useInputVertexEdges) {
+      return inputEdges;
+    } else {
+      VertexEdges<I, E> computeEdges =
+          configuration.createAndInitializeVertexEdges(inputEdges.size());
+      for (Edge<I, E> edge : inputEdges) {
+        computeEdges.add(edge);
+      }
+      return computeEdges;
+    }
+  }
+
+  /**
    * Move all edges from temporary storage to their source vertices.
    * Note: this method is not thread-safe.
    */
@@ -133,37 +166,62 @@ public class EdgeStore<I extends WritableComparable,
     if (LOG.isInfoEnabled()) {
       LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
     }
-    for (Map.Entry<Integer, ConcurrentMap<I,
-        VertexEdges<I, E>>> partitionEdges : transientEdges.entrySet()) {
-      Partition<I, V, E, M> partition =
-          service.getPartitionStore().getPartition(partitionEdges.getKey());
-      for (I vertexId : partitionEdges.getValue().keySet()) {
-        VertexEdges<I, E> vertexEdges =
-            partitionEdges.getValue().remove(vertexId);
-        Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
-        // If the source vertex doesn't exist, create it. Otherwise,
-        // just set the edges.
-        if (vertex == null) {
-          vertex = configuration.createVertex();
-          vertex.initialize(vertexId, configuration.createVertexValue(),
-              vertexEdges);
-          partition.putVertex(vertex);
-        } else {
-          vertex.setEdges(vertexEdges);
-          // Some Partition implementations (e.g. ByteArrayPartition) require
-          // us to put back the vertex after modifying it.
-          partition.saveVertex(vertex);
+
+    final BlockingQueue<Integer> partitionIdQueue =
+        new ArrayBlockingQueue<Integer>(transientEdges.size());
+    partitionIdQueue.addAll(transientEdges.keySet());
+    int numThreads = configuration.getNumInputSplitsThreads();
+    ExecutorService movePartitionExecutor =
+        Executors.newFixedThreadPool(numThreads,
+            new ThreadFactoryBuilder().setNameFormat("move-edges-%d").build());
+
+    for (int i = 0; i < numThreads; ++i) {
+      Callable moveCallable = new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          Integer partitionId;
+          while ((partitionId = partitionIdQueue.poll()) != null) {
+            Partition<I, V, E, M> partition =
+                service.getPartitionStore().getPartition(partitionId);
+            ConcurrentMap<I, VertexEdges<I, E>> partitionEdges =
+                transientEdges.remove(partitionId);
+            for (I vertexId : partitionEdges.keySet()) {
+              VertexEdges<I, E> vertexEdges = convertInputToComputeEdges(
+                  partitionEdges.remove(vertexId));
+              Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
+              // If the source vertex doesn't exist, create it. Otherwise,
+              // just set the edges.
+              if (vertex == null) {
+                vertex = configuration.createVertex();
+                vertex.initialize(vertexId, configuration.createVertexValue(),
+                    vertexEdges);
+                partition.putVertex(vertex);
+              } else {
+                vertex.setEdges(vertexEdges);
+                // Some Partition implementations (e.g. ByteArrayPartition)
+                // require us to put back the vertex after modifying it.
+                partition.saveVertex(vertex);
+              }
+            }
+            // Some PartitionStore implementations
+            // (e.g. DiskBackedPartitionStore) require us to put back the
+            // partition after modifying it.
+            service.getPartitionStore().putPartition(partition);
+          }
+          return null;
         }
-        progressable.progress();
-      }
-      // Some PartitionStore implementations (e.g. DiskBackedPartitionStore)
-      // require us to put back the partition after modifying it.
-      service.getPartitionStore().putPartition(partition);
+      };
+      movePartitionExecutor.submit(moveCallable);
     }
+
+    movePartitionExecutor.shutdown();
+    ProgressableUtils.awaitExecutorTermination(movePartitionExecutor,
+        progressable);
+    transientEdges.clear();
+
     if (LOG.isInfoEnabled()) {
       LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
           "vertices.");
     }
-    transientEdges.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
 
b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index eace06f..1e05773 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -170,10 +170,13 @@ public class GiraphConfigurationValidator<I extends 
WritableComparable,
     }
   }
 
-  /** Verify matching generic types in VertexEdges. */
-  private void verifyVertexEdgesGenericTypes() {
-    Class<? extends VertexEdges<I, E>> vertexEdgesClass =
-        conf.getVertexEdgesClass();
+  /**
+   * Verify matching generic types for a specific VertexEdges class.
+   *
+   * @param vertexEdgesClass {@link VertexEdges} class to check
+   */
+  private void verifyVertexEdgesGenericTypesClass(
+      Class<? extends VertexEdges<I, E>> vertexEdgesClass) {
     List<Class<?>> classList = ReflectionUtils.getTypeArguments(
         VertexEdges.class, vertexEdgesClass);
     // VertexEdges implementations can be generic, in which case there are no
@@ -198,6 +201,16 @@ public class GiraphConfigurationValidator<I extends 
WritableComparable,
     }
   }
 
+  /** Verify matching generic types in VertexEdges. */
+  private void verifyVertexEdgesGenericTypes() {
+    Class<? extends VertexEdges<I, E>> vertexEdgesClass =
+        conf.getVertexEdgesClass();
+    Class<? extends VertexEdges<I, E>> inputVertexEdgesClass =
+        conf.getInputVertexEdgesClass();
+    verifyVertexEdgesGenericTypesClass(vertexEdgesClass);
+    verifyVertexEdgesGenericTypesClass(inputVertexEdgesClass);
+  }
+
   /** Verify matching generic types in VertexInputFormat. */
   private void verifyVertexInputFormatGenericTypes() {
     Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =

http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index cb2a2f7..bd30455 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -203,6 +203,11 @@ public final class ConfigurationUtils {
           (Class<? extends VertexEdges>)
               Class.forName(cmd.getOptionValue("ve")));
     }
+    if (cmd.hasOption("ive")) {
+      giraphConfiguration.setInputVertexEdgesClass(
+          (Class<? extends VertexEdges>)
+              Class.forName(cmd.getOptionValue("ive")));
+    }
     if (cmd.hasOption("wc")) {
       giraphConfiguration.setWorkerContextClass(
           (Class<? extends WorkerContext>)

http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index abf6950..2bba672 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -126,6 +126,7 @@ public class InternalVertexRunner {
       GiraphConfiguration conf = job.getConfiguration();
       conf.setVertexClass(classes.getVertexClass());
       conf.setVertexEdgesClass(classes.getVertexEdgesClass());
+      conf.setInputVertexEdgesClass(classes.getInputVertexEdgesClass());
       conf.setVertexValueFactoryClass(classes.getVertexValueFactoryClass());
       if (classes.hasVertexInputFormat()) {
         conf.setVertexInputFormatClass(classes.getVertexInputFormatClass());

http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java 
b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
index bfc7e8b..0dcefd9 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
@@ -23,14 +23,15 @@ import com.google.common.collect.Maps;
 import org.apache.giraph.BspCase;
 import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
 import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat;
 import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.giraph.edge.ByteArrayEdges;
-import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.Test;
@@ -39,6 +40,8 @@ import java.io.IOException;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * A test case to ensure that loading a graph from a list of edges works as
@@ -183,6 +186,38 @@ public class TestEdgeInput extends BspCase {
     assertEquals(1, (int) values.get(5));
   }
 
+  // It should use the specified input VertexEdges class.
+  @Test
+  public void testDifferentInputEdgesClass() throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    GiraphClasses classes = new GiraphClasses();
+    classes.setVertexClass(TestVertexCheckEdgesType.class);
+    classes.setVertexEdgesClass(ByteArrayEdges.class);
+    classes.setInputVertexEdgesClass(TestVertexEdgesFilterEven.class);
+    classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Map<String, String> params = ImmutableMap.of();
+    Iterable<String> results = InternalVertexRunner.run(classes, params,
+        null, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with outgoing edges in the input have been
+    // created
+    assertEquals(3, values.size());
+    // Check the number of edges for each vertex (edges with odd target id
+    // should have been removed)
+    assertEquals(1, (int) values.get(1));
+    assertEquals(1, (int) values.get(2));
+    assertEquals(0, (int) values.get(4));
+  }
+
   public static class TestVertexWithNumEdges extends Vertex<IntWritable,
         IntWritable, NullWritable, NullWritable> {
     @Override
@@ -192,8 +227,17 @@ public class TestEdgeInput extends BspCase {
     }
   }
 
+  public static class TestVertexCheckEdgesType extends TestVertexWithNumEdges {
+    @Override
+    public void compute(Iterable<NullWritable> messages) throws IOException {
+      assertFalse(getEdges() instanceof TestVertexEdgesFilterEven);
+      assertTrue(getEdges() instanceof ByteArrayEdges);
+      super.compute(messages);
+    }
+  }
+
   public static class TestVertexDoNothing extends Vertex<IntWritable,
-      IntWritable, NullWritable, NullWritable> {
+        IntWritable, NullWritable, NullWritable> {
     @Override
     public void compute(Iterable<NullWritable> messages) throws IOException {
       voteToHalt();
@@ -204,7 +248,7 @@ public class TestEdgeInput extends BspCase {
       implements VertexValueFactory<IntWritable> {
     @Override
     public void initialize(ImmutableClassesGiraphConfiguration<?, IntWritable,
-        ?, ?> configuration) { }
+            ?, ?> configuration) { }
 
     @Override
     public IntWritable createVertexValue() {
@@ -212,6 +256,16 @@ public class TestEdgeInput extends BspCase {
     }
   }
 
+  public static class TestVertexEdgesFilterEven
+      extends ByteArrayEdges<IntWritable, NullWritable> {
+    @Override
+    public void add(Edge<IntWritable, NullWritable> edge) {
+      if (edge.getTargetVertexId().get() % 2 == 0) {
+        super.add(edge);
+      }
+    }
+  }
+
   private static Map<Integer, Integer> parseResults(Iterable<String> results) {
     Map<Integer, Integer> values = Maps.newHashMap();
     for (String line : results) {

Reply via email to