http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java index 8e6c92b..b08e7ad 100644 --- a/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java +++ b/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java @@ -17,31 +17,40 @@ */ package org.apache.giraph.io; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - import org.apache.giraph.bsp.BspUtils; import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.graph.*; +import org.apache.giraph.graph.DefaultEdge; +import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.GraphState; import org.apache.giraph.io.formats.AdjacencyListTextVertexInputFormat; import org.apache.giraph.io.formats.TextDoubleDoubleAdjacencyListVertexInputFormat; import org.apache.giraph.vertex.EdgeListVertex; import org.apache.giraph.vertex.Vertex; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.*; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + import java.io.IOException; import java.lang.reflect.Method; import java.util.Arrays; import java.util.Collections; import java.util.List; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -164,9 +173,9 @@ public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TextDoub vr.getCurrentVertex(); setGraphState(vertex, graphState); assertValidVertex(conf, graphState, vertex, new Text("Hi"), new DoubleWritable(0), - new Edge<Text, DoubleWritable>(new Text("Ciao"), new DoubleWritable(1.123d)), - new Edge<Text, DoubleWritable>(new Text("Bomdia"), new DoubleWritable(2.234d)), - new Edge<Text, DoubleWritable>(new Text("Ola"), new DoubleWritable(3.345d))); + new DefaultEdge<Text, DoubleWritable>(new Text("Ciao"), new DoubleWritable(1.123d)), + new DefaultEdge<Text, DoubleWritable>(new Text("Bomdia"), new DoubleWritable(2.234d)), + new DefaultEdge<Text, DoubleWritable>(new Text("Ola"), new DoubleWritable(3.345d))); assertEquals(vertex.getNumEdges(), 3); } @@ -192,9 +201,9 @@ public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TextDoub setGraphState(vertex, graphState); assertValidVertex(conf, graphState, vertex, new Text("BYE"), new DoubleWritable(0.01d), - new Edge<Text, DoubleWritable>(new Text("CIAO"), new DoubleWritable(1.001d)), - new Edge<Text, DoubleWritable>(new Text("TCHAU"), new DoubleWritable(2.0001d)), - new Edge<Text, DoubleWritable>(new Text("ADIOS"), new DoubleWritable(3.00001d))); + new DefaultEdge<Text, DoubleWritable>(new Text("CIAO"), new DoubleWritable(1.001d)), + new DefaultEdge<Text, DoubleWritable>(new Text("TCHAU"), new DoubleWritable(2.0001d)), + new DefaultEdge<Text, DoubleWritable>(new Text("ADIOS"), new DoubleWritable(3.00001d))); assertEquals(vertex.getNumEdges(), 3); } @@ -213,7 +222,7 @@ public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TextDoub vr.getCurrentVertex(); setGraphState(vertex, graphState); assertValidVertex(conf, graphState, vertex, new Text("alpha"), new DoubleWritable(42d), - new Edge<Text, DoubleWritable>(new Text("beta"), new DoubleWritable(99d))); + new DefaultEdge<Text, DoubleWritable>(new Text("beta"), new DoubleWritable(99d))); assertEquals(vertex.getNumEdges(), 1); }
http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java index 61e8863..185ba50 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java @@ -17,11 +17,11 @@ */ package org.apache.giraph.partition; +import org.apache.giraph.graph.DefaultEdge; import org.apache.giraph.graph.Edge; -import org.apache.giraph.partition.PartitionOwner; -import org.apache.giraph.vertex.EdgeListVertex; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.GiraphTransferRegulator; +import org.apache.giraph.vertex.EdgeListVertex; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; @@ -29,16 +29,16 @@ import org.apache.hadoop.io.LongWritable; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import com.google.common.collect.Lists; import java.io.IOException; import java.util.List; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Test the GiraphTransferRegulator. */ @@ -75,11 +75,11 @@ public class TestGiraphTransferRegulator { job.getConfiguration() .setInt(GiraphTransferRegulator.MAX_EDGES_PER_TRANSFER, 3); List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList(); - edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(2), + edges.add(new DefaultEdge<IntWritable, DoubleWritable>(new IntWritable(2), new DoubleWritable(22))); - edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(3), + edges.add(new DefaultEdge<IntWritable, DoubleWritable>(new IntWritable(3), new DoubleWritable(33))); - edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(4), + edges.add(new DefaultEdge<IntWritable, DoubleWritable>(new IntWritable(4), new DoubleWritable(44))); vertex.initialize(null, null, edges); GiraphTransferRegulator gtr = http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-core/src/test/java/org/apache/giraph/vertex/TestIntIntNullIntVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/vertex/TestIntIntNullIntVertex.java b/giraph-core/src/test/java/org/apache/giraph/vertex/TestIntIntNullIntVertex.java index 73bd135..155861c 100644 --- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestIntIntNullIntVertex.java +++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestIntIntNullIntVertex.java @@ -19,12 +19,11 @@ package org.apache.giraph.vertex; import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.EdgeNoValue; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.junit.Test; -import static org.junit.Assert.assertEquals; - import com.google.common.collect.Lists; import java.io.ByteArrayInputStream; @@ -36,6 +35,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.List; +import static org.junit.Assert.assertEquals; + /** * Tests {@link org.apache.giraph.vertex.IntIntNullIntVertex}. */ @@ -54,10 +55,8 @@ public class TestIntIntNullIntVertex { IntIntNullIntVertex vertex = new MyIntIntNullVertex(); List<Edge<IntWritable, NullWritable>> edges = Lists.newLinkedList(); - edges.add(new Edge<IntWritable, NullWritable>(new IntWritable(3), - NullWritable.get())); - edges.add(new Edge<IntWritable, NullWritable>(new IntWritable(47), - NullWritable.get())); + edges.add(new EdgeNoValue<IntWritable>(new IntWritable(3))); + edges.add(new EdgeNoValue<IntWritable>(new IntWritable(47))); vertex.initialize(new IntWritable(23), new IntWritable(7), edges); vertex.voteToHalt(); http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java index 8822d2e..a5a3545 100644 --- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java +++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java @@ -21,7 +21,7 @@ package org.apache.giraph.vertex; import com.google.common.collect.Lists; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.DefaultEdge; import org.apache.hadoop.io.IntWritable; import org.junit.Before; import org.junit.Test; @@ -77,11 +77,11 @@ public class TestMultiGraphVertex { // in order to catch corner cases: // Edge list of form: [A, B, A] - vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1), + vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(1))); - vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2), + vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(2), new IntWritable(2))); - vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1), + vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(10))); assertEquals(vertex.getNumEdges(), 3); assertEquals(vertex.removeEdges(new IntWritable(1)), 2); @@ -89,11 +89,11 @@ public class TestMultiGraphVertex { // Edge list of form: [A, B, B] vertex = instantiateVertex(vertexClass); - vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2), + vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(2), new IntWritable(2))); - vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1), + vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(1))); - vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1), + vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(10))); assertEquals(vertex.getNumEdges(), 3); assertEquals(vertex.removeEdges(new IntWritable(1)), 2); @@ -101,11 +101,11 @@ public class TestMultiGraphVertex { // Edge list of form: [A, A, B] vertex = instantiateVertex(vertexClass); - vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1), + vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(1))); - vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1), + vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(10))); - vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2), + vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(2), new IntWritable(2))); assertEquals(vertex.getNumEdges(), 3); assertEquals(vertex.removeEdges(new IntWritable(1)), 2); http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java index 9ed2d81..ca4ba1a 100644 --- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java +++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java @@ -17,14 +17,18 @@ */ package org.apache.giraph.vertex; -import com.google.common.collect.Lists; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.DefaultEdge; import org.apache.giraph.graph.Edge; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; import org.apache.giraph.time.Times; -import org.apache.giraph.utils.*; +import org.apache.giraph.utils.DynamicChannelBufferInputStream; +import org.apache.giraph.utils.DynamicChannelBufferOutputStream; +import org.apache.giraph.utils.UnsafeByteArrayInputStream; +import org.apache.giraph.utils.UnsafeByteArrayOutputStream; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; @@ -32,11 +36,16 @@ import org.apache.hadoop.io.LongWritable; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; + import java.io.IOException; import java.util.Collection; import java.util.List; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * Test all the mutable vertices (except multigraph versions) @@ -138,7 +147,7 @@ public class TestMutableVertex { List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList(); for (int i = 1000; i > 0; --i) { - edges.add(new Edge<IntWritable, DoubleWritable>( + edges.add(new DefaultEdge<IntWritable, DoubleWritable>( new IntWritable(i), new DoubleWritable(i * 2.0))); } @@ -173,7 +182,7 @@ public class TestMutableVertex { List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList(); for (int i = 1000; i > 0; --i) { - edges.add(new Edge<IntWritable, DoubleWritable>( + edges.add(new DefaultEdge<IntWritable, DoubleWritable>( new IntWritable(i), new DoubleWritable(i * 3.0))); } @@ -211,19 +220,19 @@ public class TestMutableVertex { vertex.initialize(new IntWritable(0), new FloatWritable(0.0f)); assertEquals(vertex.getNumEdges(), 0); - assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>( + assertTrue(vertex.addEdge(new DefaultEdge<IntWritable, DoubleWritable>( new IntWritable(2), new DoubleWritable(2.0)))); assertEquals(vertex.getNumEdges(), 1); assertEquals(vertex.getEdgeValue(new IntWritable(2)), new DoubleWritable(2.0)); - assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>( + assertTrue(vertex.addEdge(new DefaultEdge<IntWritable, DoubleWritable>( new IntWritable(4), new DoubleWritable(4.0)))); - assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>( + assertTrue(vertex.addEdge(new DefaultEdge<IntWritable, DoubleWritable>( new IntWritable(3), new DoubleWritable(3.0)))); - assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>( + assertTrue(vertex.addEdge(new DefaultEdge<IntWritable, DoubleWritable>( new IntWritable(1), new DoubleWritable(1.0)))); assertEquals(vertex.getNumEdges(), 4); @@ -271,7 +280,7 @@ public class TestMutableVertex { List<Edge<IntWritable, DoubleWritable>> edges = Lists.newArrayListWithCapacity(edgesCount); for (int i = edgesCount; i > 0; --i) { - edges.add(new Edge<IntWritable, DoubleWritable>( + edges.add(new DefaultEdge<IntWritable, DoubleWritable>( new IntWritable(i), new DoubleWritable(i * 2.0))); } vertex.initialize(new IntWritable(2), new FloatWritable(3.0f), edges); http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java index 18fee52..a823971 100644 --- a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java +++ b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java @@ -17,10 +17,11 @@ */ package org.apache.giraph.io.hbase.edgemarker; +import org.apache.giraph.graph.DefaultEdge; import org.apache.giraph.graph.Edge; -import org.apache.giraph.vertex.Vertex; import org.apache.giraph.io.VertexReader; import org.apache.giraph.io.hbase.HBaseVertexInputFormat; +import org.apache.giraph.vertex.Vertex; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; @@ -86,7 +87,7 @@ public class TableEdgeInputFormat extends String edge = Bytes.toString(row.getValue(CF, CHILDREN)); Text vertexValue = new Text(); Text edgeId = new Text(edge); - edges.add(new Edge<Text, Text>(edgeId, uselessEdgeValue)); + edges.add(new DefaultEdge<Text, Text>(edgeId, uselessEdgeValue)); vertex.initialize(vertexId, vertexValue, edges); return vertex; http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java index 018972e..c92cc34 100644 --- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java +++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java @@ -18,10 +18,12 @@ package org.apache.giraph.io.hcatalog; -import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.DefaultEdge; +import org.apache.giraph.graph.EdgeNoValue; +import org.apache.giraph.graph.EdgeWithSource; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; -import org.apache.giraph.graph.EdgeWithSource; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -55,14 +57,36 @@ public abstract class HCatalogEdgeInputFormat< } /** + * Get underlying HCatalog input format. Used for creating readers. + * + * @return GiraphHCatInputFormat stored. + */ + protected GiraphHCatInputFormat getHCatInputFormat() { + return hCatInputFormat; + } + + /** * {@link EdgeReader} for {@link HCatalogEdgeInputFormat}. */ - protected abstract class HCatalogEdgeReader implements EdgeReader<I, E> { + protected abstract static class HCatalogEdgeReader< + I extends WritableComparable, E extends Writable> + implements EdgeReader<I, E> { + /** HCatalog input format to use */ + private final GiraphHCatInputFormat hCatInputFormat; /** Internal {@link RecordReader}. */ private RecordReader<WritableComparable, HCatRecord> hCatRecordReader; /** Context passed to initialize. */ private TaskAttemptContext context; + /** + * Constructor taking hcat input format to use. + * + * @param hCatInputFormat HCatalog input format + */ + public HCatalogEdgeReader(GiraphHCatInputFormat hCatInputFormat) { + this.hCatInputFormat = hCatInputFormat; + } + @Override public final void initialize(InputSplit inputSplit, TaskAttemptContext context) @@ -110,10 +134,10 @@ public abstract class HCatalogEdgeInputFormat< /** * Create {@link EdgeReader}. - + * * @return {@link HCatalogEdgeReader} instance. */ - protected abstract HCatalogEdgeReader createEdgeReader(); + protected abstract HCatalogEdgeReader<I, E> createEdgeReader(); @Override public EdgeReader<I, E> @@ -133,8 +157,17 @@ public abstract class HCatalogEdgeInputFormat< * {@link HCatalogEdgeReader} for tables holding a complete edge * in each row. */ - protected abstract class SingleRowHCatalogEdgeReader - extends HCatalogEdgeReader { + protected abstract static class SingleRowHCatalogEdgeReader< + I extends WritableComparable, E extends Writable> + extends HCatalogEdgeReader<I, E> { + /** + * Constructor + * @param hCatInputFormat giraph input format to use + */ + public SingleRowHCatalogEdgeReader(GiraphHCatInputFormat hCatInputFormat) { + super(hCatInputFormat); + } + /** * Get source vertex id from a record. * @@ -165,7 +198,50 @@ public abstract class HCatalogEdgeInputFormat< HCatRecord record = getRecordReader().getCurrentValue(); return new EdgeWithSource<I, E>( getSourceVertexId(record), - new Edge<I, E>(getTargetVertexId(record), getEdgeValue(record))); + new DefaultEdge<I, E>(getTargetVertexId(record), + getEdgeValue(record))); + } + } + + /** + * {@link HCatalogEdgeReader} for tables holding a complete edge + * in each row where the edges contain no data other than IDs they point to. + */ + protected abstract static class SingleRowHCatalogEdgeNoValueReader< + I extends WritableComparable> + extends HCatalogEdgeReader<I, NullWritable> { + /** + * Constructor + * @param hCatInputFormat giraph input format to use + */ + public SingleRowHCatalogEdgeNoValueReader( + GiraphHCatInputFormat hCatInputFormat) { + super(hCatInputFormat); + } + + /** + * Get source vertex id from a record. + * + * @param record Input record + * @return I Source vertex id + */ + protected abstract I getSourceVertexId(HCatRecord record); + + /** + * Get target vertex id from a record. + * + * @param record Input record + * @return I Target vertex id + */ + protected abstract I getTargetVertexId(HCatRecord record); + + @Override + public EdgeWithSource<I, NullWritable> getCurrentEdge() throws IOException, + InterruptedException { + HCatRecord record = getRecordReader().getCurrentValue(); + return new EdgeWithSource<I, NullWritable>( + getSourceVertexId(record), + new EdgeNoValue<I>(getTargetVertexId(record))); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java index 52b9ae3..319242d 100644 --- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java +++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java @@ -18,13 +18,13 @@ package org.apache.giraph.io.hcatalog; -import com.google.common.collect.Lists; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.DefaultEdge; import org.apache.giraph.graph.Edge; -import org.apache.giraph.vertex.Vertex; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; import org.apache.giraph.utils.TimedLogger; +import org.apache.giraph.vertex.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -34,6 +34,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hcatalog.data.HCatRecord; import org.apache.log4j.Logger; +import com.google.common.collect.Lists; + import java.io.IOException; import java.util.List; @@ -353,7 +355,7 @@ public abstract class HCatalogVertexInputFormat< currentVertexId = getVertexId(record); } if (currentVertexId.equals(getVertexId(record))) { - currentEdges.add(new Edge<I, E>( + currentEdges.add(new DefaultEdge<I, E>( getTargetVertexId(record), getEdgeValue(record))); recordsForVertex.add(record);
