http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java deleted file mode 100644 index 45d946f..0000000 --- a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.io; - -import org.apache.giraph.BspCase; -import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.edge.ByteArrayEdges; -import org.apache.giraph.edge.Edge; -import org.apache.giraph.graph.Vertex; -import org.apache.giraph.factories.VertexValueFactory; -import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; -import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat; -import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat; -import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat; -import org.apache.giraph.utils.ComputationCountEdges; -import org.apache.giraph.utils.IntIntNullNoOpComputation; -import org.apache.giraph.utils.InternalVertexRunner; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.NullWritable; -import org.junit.Test; - -import com.google.common.collect.Maps; - -import java.io.IOException; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * A test case to ensure that loading a graph from a list of edges works as - * expected. - */ -public class TestEdgeInput extends BspCase { - public TestEdgeInput() { - super(TestEdgeInput.class.getName()); - } - - // It should be able to build a graph starting from the edges only. - // Vertices should be implicitly created with default values. - @Test - public void testEdgesOnly() throws Exception { - String[] edges = new String[] { - "1 2", - "2 3", - "2 4", - "4 1" - }; - - GiraphConfiguration conf = new GiraphConfiguration(); - conf.setComputationClass(ComputationCountEdges.class); - conf.setOutEdgesClass(ByteArrayEdges.class); - conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); - conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); - Iterable<String> results = InternalVertexRunner.run(conf, null, edges); - - Map<Integer, Integer> values = parseResults(results); - - // Check that all vertices with outgoing edges have been created - assertEquals(3, values.size()); - // Check the number of edges for each vertex - assertEquals(1, (int) values.get(1)); - assertEquals(2, (int) values.get(2)); - assertEquals(1, (int) values.get(4)); - } - - // It should be able to build a graph starting from the edges only. - // Using ReverseEdgeDuplicator it should also create the reverse edges. - // Vertices should be implicitly created with default values. - @Test - public void testEdgesOnlyWithReverse() throws Exception { - String[] edges = new String[] { - "1 2", - "2 3", - "2 4", - "4 1" - }; - - GiraphConfiguration conf = new GiraphConfiguration(); - conf.setComputationClass(ComputationCountEdges.class); - conf.setOutEdgesClass(ByteArrayEdges.class); - conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class); - conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); - Iterable<String> results = InternalVertexRunner.run(conf, null, edges); - - Map<Integer, Integer> values = parseResults(results); - - // Check that all vertices with outgoing edges have been created - assertEquals(4, values.size()); - // Check the number of edges for each vertex - assertEquals(2, (int) values.get(1)); - assertEquals(3, (int) values.get(2)); - assertEquals(1, (int) values.get(3)); - assertEquals(2, (int) values.get(4)); - } - - // It should be able to build a graph by specifying vertex data and edges - // as separate input formats. - @Test - public void testMixedFormat() throws Exception { - String[] vertices = new String[] { - "1 75", - "2 34", - "3 13", - "4 32" - }; - String[] edges = new String[] { - "1 2", - "2 3", - "2 4", - "4 1", - "5 3" - }; - - GiraphConfiguration conf = new GiraphConfiguration(); - conf.setComputationClass(IntIntNullNoOpComputation.class); - conf.setOutEdgesClass(ByteArrayEdges.class); - conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class); - conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); - conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); - - // Run a job with a vertex that does nothing - Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges); - - Map<Integer, Integer> values = parseResults(results); - - // Check that all vertices with either initial values or outgoing edges - // have been created - assertEquals(5, values.size()); - // Check that the vertices have been created with correct values - assertEquals(75, (int) values.get(1)); - assertEquals(34, (int) values.get(2)); - assertEquals(13, (int) values.get(3)); - assertEquals(32, (int) values.get(4)); - // A vertex with edges but no initial value should have the default value - assertEquals(0, (int) values.get(5)); - - // Run a job with a custom VertexValueFactory - conf.setVertexValueFactoryClass(TestVertexValueFactory.class); - results = InternalVertexRunner.run(conf, vertices, edges); - values = parseResults(results); - // A vertex with edges but no initial value should have been constructed - // by the custom factory - assertEquals(3, (int) values.get(5)); - - conf = new GiraphConfiguration(); - conf.setComputationClass(ComputationCountEdges.class); - conf.setOutEdgesClass(ByteArrayEdges.class); - conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class); - conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); - conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); - - // Run a job with a vertex that counts outgoing edges - results = InternalVertexRunner.run(conf, vertices, edges); - - values = parseResults(results); - - // Check the number of edges for each vertex - assertEquals(1, (int) values.get(1)); - assertEquals(2, (int) values.get(2)); - assertEquals(0, (int) values.get(3)); - assertEquals(1, (int) values.get(4)); - assertEquals(1, (int) values.get(5)); - } - - // It should use the specified input OutEdges class. - @Test - public void testDifferentInputEdgesClass() throws Exception { - String[] edges = new String[] { - "1 2", - "2 3", - "2 4", - "4 1" - }; - - GiraphConfiguration conf = new GiraphConfiguration(); - conf.setComputationClass(TestComputationCheckEdgesType.class); - conf.setOutEdgesClass(ByteArrayEdges.class); - conf.setInputOutEdgesClass(TestOutEdgesFilterEven.class); - conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); - conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); - Iterable<String> results = InternalVertexRunner.run(conf, null, edges); - - Map<Integer, Integer> values = parseResults(results); - - // Check that all vertices with outgoing edges in the input have been - // created - assertEquals(3, values.size()); - // Check the number of edges for each vertex (edges with odd target id - // should have been removed) - assertEquals(1, (int) values.get(1)); - assertEquals(1, (int) values.get(2)); - assertEquals(0, (int) values.get(4)); - } - - public static class TestComputationCheckEdgesType extends - ComputationCountEdges { - @Override - public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex, - Iterable<NullWritable> messages) throws IOException { - assertFalse(vertex.getEdges() instanceof TestOutEdgesFilterEven); - assertTrue(vertex.getEdges() instanceof ByteArrayEdges); - super.compute(vertex, messages); - } - } - - public static class TestVertexValueFactory - implements VertexValueFactory<IntWritable> { - @Override - public void initialize(ImmutableClassesGiraphConfiguration conf) { } - - @Override - public Class<IntWritable> getValueClass() { - return IntWritable.class; - } - - @Override - public IntWritable newInstance() { - return new IntWritable(3); - } - } - - public static class TestOutEdgesFilterEven - extends ByteArrayEdges<IntWritable, NullWritable> { - @Override - public void add(Edge<IntWritable, NullWritable> edge) { - if (edge.getTargetVertexId().get() % 2 == 0) { - super.add(edge); - } - } - } - - private static Map<Integer, Integer> parseResults(Iterable<String> results) { - Map<Integer, Integer> values = Maps.newHashMap(); - for (String line : results) { - String[] tokens = line.split("\\s+"); - int id = Integer.valueOf(tokens[0]); - int value = Integer.valueOf(tokens[1]); - values.put(id, value); - } - return values; - } -}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java new file mode 100644 index 0000000..721a74c --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import com.google.common.collect.Maps; +import org.apache.giraph.BspCase; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.ByteArrayEdges; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.factories.VertexValueFactory; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexValueCombiner; +import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; +import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat; +import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat; +import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat; +import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat; +import org.apache.giraph.utils.ComputationCountEdges; +import org.apache.giraph.utils.IntIntNullNoOpComputation; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.junit.Assert.*; + +/** + * A test case to ensure that loading a graph from vertices and edges works as + * expected. + */ +public class TestVertexEdgeInput extends BspCase { + public TestVertexEdgeInput() { + super(TestVertexEdgeInput.class.getName()); + } + + // It should be able to build a graph starting from the edges only. + // Vertices should be implicitly created with default values. + @Test + public void testEdgesOnly() throws Exception { + String[] edges = new String[] { + "1 2", + "2 3", + "2 4", + "4 1" + }; + + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setComputationClass(ComputationCountEdges.class); + conf.setOutEdgesClass(ByteArrayEdges.class); + conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + Iterable<String> results = InternalVertexRunner.run(conf, null, edges); + + Map<Integer, Integer> values = parseResults(results); + + // Check that all vertices with outgoing edges have been created + assertEquals(3, values.size()); + // Check the number of edges for each vertex + assertEquals(1, (int) values.get(1)); + assertEquals(2, (int) values.get(2)); + assertEquals(1, (int) values.get(4)); + } + + // It should be able to build a graph starting from the edges only. + // Using ReverseEdgeDuplicator it should also create the reverse edges. + // Vertices should be implicitly created with default values. + @Test + public void testEdgesOnlyWithReverse() throws Exception { + String[] edges = new String[] { + "1 2", + "2 3", + "2 4", + "4 1" + }; + + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setComputationClass(ComputationCountEdges.class); + conf.setOutEdgesClass(ByteArrayEdges.class); + conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + Iterable<String> results = InternalVertexRunner.run(conf, null, edges); + + Map<Integer, Integer> values = parseResults(results); + + // Check that all vertices with outgoing edges have been created + assertEquals(4, values.size()); + // Check the number of edges for each vertex + assertEquals(2, (int) values.get(1)); + assertEquals(3, (int) values.get(2)); + assertEquals(1, (int) values.get(3)); + assertEquals(2, (int) values.get(4)); + } + + /** + * Simple vertex value combiner that sums up the vertex values. + */ + public static class IntSumVertexValueCombiner implements VertexValueCombiner<IntWritable> { + @Override + public void combine(IntWritable originalVertexValue, IntWritable vertexValue) { + originalVertexValue.set(originalVertexValue.get() + vertexValue.get()); + } + } + + // It should be able to build a graph by specifying vertex value data + // and combining the duplicates (summation). Edges should be added as well. + @Test + public void testVertexValueCombiner() throws Exception { + String[] vertices = new String[] { + "1 75 2", + "2 34 3", + "3 13", + "4 32", + "1 11", + "2 23 1", + "2 3" + }; + + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setComputationClass(IntIntNullNoOpComputation.class); + conf.setOutEdgesClass(ByteArrayEdges.class); + conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class); + conf.setVertexValueCombinerClass(IntSumVertexValueCombiner.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + + // Run a job with a vertex that does nothing + Iterable<String> results = InternalVertexRunner.run(conf, vertices); + + Map<Integer, Integer> values = parseResults(results); + + // Check that all vertices were created + assertEquals(4, values.size()); + // Check that the vertices have been created with correct values + assertEquals(86, (int) values.get(1)); + assertEquals(60, (int) values.get(2)); + assertEquals(13, (int) values.get(3)); + assertEquals(32, (int) values.get(4)); + + // Run a job with a vertex that counts outgoing edges + conf.setComputationClass(ComputationCountEdges.class); + results = InternalVertexRunner.run(conf, vertices); + + // Check that the edges were added as well + values = parseResults(results); + assertEquals(1, (int) values.get(1)); + assertEquals(2, (int) values.get(2)); + assertEquals(0, (int) values.get(3)); + assertEquals(0, (int) values.get(4)); + } + + // It should be able to build a graph by specifying vertex value data + // and edges as separate input formats. + @Test + public void testMixedVertexValueEdgeFormat() throws Exception { + String[] vertices = new String[] { + "1 75", + "2 34", + "3 13", + "4 32" + }; + String[] edges = new String[] { + "1 2", + "2 3", + "2 4", + "4 1", + "5 3" + }; + + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setComputationClass(IntIntNullNoOpComputation.class); + conf.setOutEdgesClass(ByteArrayEdges.class); + conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class); + conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + + // Run a job with a vertex that does nothing + Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges); + + Map<Integer, Integer> values = parseResults(results); + + // Check that all vertices with either initial values or outgoing edges + // have been created + assertEquals(5, values.size()); + // Check that the vertices have been created with correct values + assertEquals(75, (int) values.get(1)); + assertEquals(34, (int) values.get(2)); + assertEquals(13, (int) values.get(3)); + assertEquals(32, (int) values.get(4)); + // A vertex with edges but no initial value should have the default value + assertEquals(0, (int) values.get(5)); + + // Run a job with a custom VertexValueFactory + conf.setVertexValueFactoryClass(TestVertexValueFactory.class); + results = InternalVertexRunner.run(conf, vertices, edges); + values = parseResults(results); + // A vertex with edges but no initial value should have been constructed + // by the custom factory + assertEquals(3, (int) values.get(5)); + + conf = new GiraphConfiguration(); + conf.setComputationClass(ComputationCountEdges.class); + conf.setOutEdgesClass(ByteArrayEdges.class); + conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class); + conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + + // Run a job with a vertex that counts outgoing edges + results = InternalVertexRunner.run(conf, vertices, edges); + + values = parseResults(results); + + // Check the number of edges for each vertex + assertEquals(1, (int) values.get(1)); + assertEquals(2, (int) values.get(2)); + assertEquals(0, (int) values.get(3)); + assertEquals(1, (int) values.get(4)); + assertEquals(1, (int) values.get(5)); + } + + // It should be able to build a graph by specifying vertices and edges + // as separate input formats. + @Test + public void testMixedVertexEdgeFormat() throws Exception { + String[] vertices = new String[] { + "1 75 2 3", + "2 34 1 5", + "3 13", + "4 32" + }; + String[] edges = new String[] { + "1 2", + "2 3", + "2 4", + "4 1", + "5 3" + }; + + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setComputationClass(IntIntNullNoOpComputation.class); + conf.setOutEdgesClass(ByteArrayEdges.class); + conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class); + conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + + // Run a job with a vertex that does nothing + Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges); + + Map<Integer, Integer> values = parseResults(results); + + // Check that all vertices with either initial values or outgoing edges + // have been created + assertEquals(5, values.size()); + // Check that the vertices have been created with correct values + assertEquals(75, (int) values.get(1)); + assertEquals(34, (int) values.get(2)); + assertEquals(13, (int) values.get(3)); + assertEquals(32, (int) values.get(4)); + // A vertex with edges but no initial value should have the default value + assertEquals(0, (int) values.get(5)); + + // Run a job with a custom VertexValueFactory + conf.setVertexValueFactoryClass(TestVertexValueFactory.class); + results = InternalVertexRunner.run(conf, vertices, edges); + values = parseResults(results); + // A vertex with edges but no initial value should have been constructed + // by the custom factory + assertEquals(3, (int) values.get(5)); + + conf = new GiraphConfiguration(); + conf.setComputationClass(ComputationCountEdges.class); + conf.setOutEdgesClass(ByteArrayEdges.class); + conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class); + conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + + // Run a job with a vertex that counts outgoing edges + results = InternalVertexRunner.run(conf, vertices, edges); + + values = parseResults(results); + + // Check the number of edges for each vertex + assertEquals(3, (int) values.get(1)); + assertEquals(4, (int) values.get(2)); + assertEquals(0, (int) values.get(3)); + assertEquals(1, (int) values.get(4)); + assertEquals(1, (int) values.get(5)); + } + + // It should use the specified input OutEdges class. + @Test + public void testDifferentInputEdgesClass() throws Exception { + String[] edges = new String[] { + "1 2", + "2 3", + "2 4", + "4 1" + }; + + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setComputationClass(TestComputationCheckEdgesType.class); + conf.setOutEdgesClass(ByteArrayEdges.class); + conf.setInputOutEdgesClass(TestOutEdgesFilterEven.class); + conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + Iterable<String> results = InternalVertexRunner.run(conf, null, edges); + + Map<Integer, Integer> values = parseResults(results); + + // Check that all vertices with outgoing edges in the input have been + // created + assertEquals(3, values.size()); + // Check the number of edges for each vertex (edges with odd target id + // should have been removed) + assertEquals(1, (int) values.get(1)); + assertEquals(1, (int) values.get(2)); + assertEquals(0, (int) values.get(4)); + } + + public static class TestComputationCheckEdgesType extends + ComputationCountEdges { + @Override + public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex, + Iterable<NullWritable> messages) throws IOException { + assertFalse(vertex.getEdges() instanceof TestOutEdgesFilterEven); + assertTrue(vertex.getEdges() instanceof ByteArrayEdges); + super.compute(vertex, messages); + } + } + + public static class TestVertexValueFactory + implements VertexValueFactory<IntWritable> { + @Override + public void initialize(ImmutableClassesGiraphConfiguration conf) { } + + @Override + public Class<IntWritable> getValueClass() { + return IntWritable.class; + } + + @Override + public IntWritable newInstance() { + return new IntWritable(3); + } + } + + public static class TestOutEdgesFilterEven + extends ByteArrayEdges<IntWritable, NullWritable> { + @Override + public void add(Edge<IntWritable, NullWritable> edge) { + if (edge.getTargetVertexId().get() % 2 == 0) { + super.add(edge); + } + } + } + + private static Map<Integer, Integer> parseResults(Iterable<String> results) { + Map<Integer, Integer> values = Maps.newHashMap(); + for (String line : results) { + String[] tokens = line.split("\\s+"); + int id = Integer.valueOf(tokens[0]); + int value = Integer.valueOf(tokens[1]); + values.put(id, value); + } + return values; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java index b62775f..4a8caaa 100644 --- a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java +++ b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java @@ -18,7 +18,7 @@ package org.apache.giraph.master; -import org.apache.giraph.combiner.Combiner; +import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.AbstractComputation; @@ -47,7 +47,7 @@ public class TestComputationCombinerTypes { public void testAllMatchWithCombiner() { SuperstepClasses classes = new SuperstepClasses(IntIntIntLongDoubleComputation.class, - IntDoubleCombiner.class); + IntDoubleMessageCombiner.class); classes.verifyTypesMatch( createConfiguration(IntIntIntIntLongComputation.class), true); } @@ -88,7 +88,7 @@ public class TestComputationCombinerTypes { public void testDifferentCombinerIdType() { SuperstepClasses classes = new SuperstepClasses(IntIntIntLongDoubleComputation.class, - DoubleDoubleCombiner.class); + DoubleDoubleMessageCombiner.class); classes.verifyTypesMatch( createConfiguration(IntIntIntIntLongComputation.class), true); } @@ -97,7 +97,7 @@ public class TestComputationCombinerTypes { public void testDifferentCombinerMessageType() { SuperstepClasses classes = new SuperstepClasses(IntIntIntLongDoubleComputation.class, - IntLongCombiner.class); + IntLongMessageCombiner.class); classes.verifyTypesMatch( createConfiguration(IntIntIntIntLongComputation.class), true); } @@ -138,8 +138,8 @@ public class TestComputationCombinerTypes { NoOpComputation<IntWritable, IntWritable, LongWritable, LongWritable, IntWritable> { } - private static class NoOpCombiner<I extends WritableComparable, - M extends Writable> extends Combiner<I, M> { + private static class NoOpMessageCombiner<I extends WritableComparable, + M extends Writable> extends MessageCombiner<I, M> { @Override public void combine(I vertexIndex, M originalMessage, M messageToCombine) { } @@ -150,12 +150,15 @@ public class TestComputationCombinerTypes { } } - private static class IntDoubleCombiner extends NoOpCombiner<IntWritable, - DoubleWritable> { } + private static class IntDoubleMessageCombiner + extends NoOpMessageCombiner<IntWritable, + DoubleWritable> { } - private static class DoubleDoubleCombiner extends NoOpCombiner<DoubleWritable, - DoubleWritable> { } + private static class DoubleDoubleMessageCombiner + extends NoOpMessageCombiner<DoubleWritable, + DoubleWritable> { } - private static class IntLongCombiner extends NoOpCombiner<IntWritable, - LongWritable> { } + private static class IntLongMessageCombiner + extends NoOpMessageCombiner<IntWritable, + LongWritable> { } } http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java index 6b0ed35..e96fd12 100644 --- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java +++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java @@ -18,7 +18,7 @@ package org.apache.giraph.master; -import org.apache.giraph.combiner.Combiner; +import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.graph.AbstractComputation; import org.apache.giraph.graph.Vertex; @@ -40,7 +40,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; -/** Test switching Computation and Combiner class during application */ +/** Test switching Computation and MessageCombiner class during application */ public class TestSwitchClasses { @Test public void testSwitchingClasses() throws Exception { @@ -57,32 +57,41 @@ public class TestSwitchClasses { graph = InternalVertexRunner.run(conf, graph); Assert.assertEquals(2, graph.getVertices().size()); - StatusValue value1 = graph.getVertex(id1).getValue(); - StatusValue value2 = graph.getVertex(id2).getValue(); + } + private static void checkVerticesOnFinalSuperstep( + Vertex<IntWritable, StatusValue, IntWritable> vertex) { // Check that computations were performed in expected order - ArrayList<Integer> expectedComputations = Lists.newArrayList(1, 1, 2, 3, 1); - checkComputations(expectedComputations, value1.computations); - checkComputations(expectedComputations, value2.computations); - + final ArrayList<Integer> expectedComputations = + Lists.newArrayList(1, 1, 2, 3, 1); + checkComputations(expectedComputations, vertex.getValue().computations); // Check that messages were sent in the correct superstep, // and combined when needed - ArrayList<HashSet<Double>> messages1 = - Lists.newArrayList( - Sets.<Double>newHashSet(), - Sets.<Double>newHashSet(11d), - Sets.<Double>newHashSet(11d), - Sets.<Double>newHashSet(101.5, 201.5), - Sets.<Double>newHashSet(3002d)); - checkMessages(messages1, value1.messagesReceived); - ArrayList<HashSet<Double>> messages2 = - Lists.newArrayList( - Sets.<Double>newHashSet(), - Sets.<Double>newHashSet(12d), - Sets.<Double>newHashSet(12d), - Sets.<Double>newHashSet(102.5, 202.5), - Sets.<Double>newHashSet(3004d)); - checkMessages(messages2, value2.messagesReceived); + switch (vertex.getId().get()) { + case 1: + ArrayList<HashSet<Double>> messages1 = + Lists.newArrayList( + Sets.<Double>newHashSet(), + Sets.<Double>newHashSet(11d), + Sets.<Double>newHashSet(11d), + Sets.<Double>newHashSet(101.5, 201.5), + Sets.<Double>newHashSet(3002d)); + checkMessages(messages1, vertex.getValue().messagesReceived); + break; + case 2: + ArrayList<HashSet<Double>> messages2 = + Lists.newArrayList( + Sets.<Double>newHashSet(), + Sets.<Double>newHashSet(12d), + Sets.<Double>newHashSet(12d), + Sets.<Double>newHashSet(102.5, 202.5), + Sets.<Double>newHashSet(3004d)); + checkMessages(messages2, vertex.getValue().messagesReceived); + break; + default: + throw new IllegalStateException("checkVertices: Illegal vertex " + + vertex); + } } private static void checkComputations(ArrayList<Integer> expected, @@ -113,7 +122,7 @@ public class TestSwitchClasses { switch ((int) getSuperstep()) { case 0: setComputation(Computation1.class); - setCombiner(MinimumCombiner.class); + setMessageCombiner(MinimumMessageCombiner.class); break; case 1: // test classes don't change @@ -121,11 +130,11 @@ public class TestSwitchClasses { case 2: setComputation(Computation2.class); // test combiner removed - setCombiner(null); + setMessageCombiner(null); break; case 3: setComputation(Computation3.class); - setCombiner(SumCombiner.class); + setMessageCombiner(SumMessageCombiner.class); break; case 4: setComputation(Computation1.class); @@ -147,6 +156,10 @@ public class TestSwitchClasses { IntWritable otherId = new IntWritable(3 - vertex.getId().get()); sendMessage(otherId, new IntWritable(otherId.get() + 10)); sendMessage(otherId, new IntWritable(otherId.get() + 20)); + // Check the vertices on the final superstep + if (getSuperstep() == 4) { + checkVerticesOnFinalSuperstep(vertex); + } } } @@ -179,8 +192,9 @@ public class TestSwitchClasses { } } - public static class MinimumCombiner extends Combiner<IntWritable, - IntWritable> { + public static class MinimumMessageCombiner + extends MessageCombiner<IntWritable, + IntWritable> { @Override public void combine(IntWritable vertexIndex, IntWritable originalMessage, IntWritable messageToCombine) { @@ -194,7 +208,8 @@ public class TestSwitchClasses { } } - public static class SumCombiner extends Combiner<IntWritable, IntWritable> { + public static class SumMessageCombiner + extends MessageCombiner<IntWritable, IntWritable> { @Override public void combine(IntWritable vertexIndex, IntWritable originalMessage, IntWritable messageToCombine) { @@ -232,6 +247,12 @@ public class TestSwitchClasses { } @Override + public String toString() { + return "(computations=" + computations + + ",messagesReceived=" + messagesReceived + ")"; + } + + @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(computations.size()); for (Integer computation : computations) { http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java index 0e68b56..0dd9b9c 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java @@ -199,15 +199,15 @@ public class TestPartitionStores { partitionStore.addPartition(createPartition(conf, 4, v7)); Partition<IntWritable, IntWritable, NullWritable> partition1 = - partitionStore.getPartition(1); + partitionStore.getOrCreatePartition(1); partitionStore.putPartition(partition1); Partition<IntWritable, IntWritable, NullWritable> partition2 = - partitionStore.getPartition(2); + partitionStore.getOrCreatePartition(2); partitionStore.putPartition(partition2); Partition<IntWritable, IntWritable, NullWritable> partition3 = partitionStore.removePartition(3); Partition<IntWritable, IntWritable, NullWritable> partition4 = - partitionStore.getPartition(4); + partitionStore.getOrCreatePartition(4); partitionStore.putPartition(partition4); assertEquals(3, partitionStore.getNumPartitions()); @@ -215,7 +215,7 @@ public class TestPartitionStores { int partitionsNumber = 0; for (Integer partitionId : partitionStore.getPartitionIds()) { Partition<IntWritable, IntWritable, NullWritable> p = - partitionStore.getPartition(partitionId); + partitionStore.getOrCreatePartition(partitionId); partitionStore.putPartition(p); partitionsNumber++; } @@ -225,13 +225,13 @@ public class TestPartitionStores { assertTrue(partitionStore.hasPartition(2)); assertFalse(partitionStore.hasPartition(3)); assertTrue(partitionStore.hasPartition(4)); - partition = partitionStore.getPartition(1); + partition = partitionStore.getOrCreatePartition(1); assertEquals(3, partition.getVertexCount()); partitionStore.putPartition(partition); - partition = partitionStore.getPartition(2); + partition = partitionStore.getOrCreatePartition(2); assertEquals(2, partition.getVertexCount()); partitionStore.putPartition(partition); - partition = partitionStore.getPartition(4); + partition = partitionStore.getOrCreatePartition(4); assertEquals(1, partition.getVertexCount()); assertEquals(2, partition.getEdgeCount()); partitionStore.putPartition(partition); http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java index 115da7e..5612e5f 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java @@ -19,7 +19,7 @@ package org.apache.giraph; import org.apache.giraph.aggregators.TextAggregatorWriter; -import org.apache.giraph.combiner.SimpleSumCombiner; +import org.apache.giraph.combiner.SimpleSumMessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -290,7 +290,8 @@ public class } /** - * Run a sample BSP job locally with combiner and checkout output value. + * Run a sample BSP job locally with message combiner and + * checkout output value. * * @throws IOException * @throws ClassNotFoundException @@ -302,7 +303,7 @@ public class GiraphConfiguration conf = new GiraphConfiguration(); conf.setComputationClass(SimpleCombinerComputation.class); conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - conf.setCombinerClass(SimpleSumCombiner.class); + conf.setMessageCombinerClass(SimpleSumMessageCombiner.class); GiraphJob job = prepareJob(getCallingMethodName(), conf); assertTrue(job.run(true)); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java index 7d326da..8883d6f 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java @@ -18,7 +18,7 @@ package org.apache.giraph.examples; -import org.apache.giraph.combiner.MinimumIntCombiner; +import org.apache.giraph.combiner.MinimumIntMessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.edge.ByteArrayEdges; import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; @@ -68,7 +68,7 @@ public class ConnectedComponentsComputationTest { GiraphConfiguration conf = new GiraphConfiguration(); conf.setComputationClass(ConnectedComponentsComputation.class); conf.setOutEdgesClass(ByteArrayEdges.class); - conf.setCombinerClass(MinimumIntCombiner.class); + conf.setMessageCombinerClass(MinimumIntMessageCombiner.class); conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class); conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java index dbcd569..1bb8e94 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java @@ -18,7 +18,7 @@ package org.apache.giraph.examples; -import org.apache.giraph.combiner.MinimumIntCombiner; +import org.apache.giraph.combiner.MinimumIntMessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.edge.ByteArrayEdges; import org.apache.giraph.graph.Vertex; @@ -59,7 +59,7 @@ public class ConnectedComponentsComputationTestInMemory { GiraphConfiguration conf = new GiraphConfiguration(); conf.setComputationClass(ConnectedComponentsComputation.class); conf.setOutEdgesClass(ByteArrayEdges.class); - conf.setCombinerClass(MinimumIntCombiner.class); + conf.setMessageCombinerClass(MinimumIntMessageCombiner.class); TestGraph<IntWritable, IntWritable, NullWritable> graph = new TestGraph<IntWritable, IntWritable, NullWritable>(conf); http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java index 434c756..aa6cd8a 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java @@ -18,26 +18,27 @@ package org.apache.giraph.examples; -import static org.junit.Assert.assertEquals; - -import org.apache.giraph.combiner.Combiner; -import org.apache.giraph.combiner.MinimumIntCombiner; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.combiner.MinimumIntMessageCombiner; import org.apache.hadoop.io.IntWritable; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class MinimumIntCombinerTest { @Test public void testCombiner() throws Exception { - Combiner<IntWritable, IntWritable> combiner = - new MinimumIntCombiner(); + MessageCombiner<IntWritable, IntWritable> + messageCombiner = + new MinimumIntMessageCombiner(); IntWritable vertexId = new IntWritable(1); - IntWritable result = combiner.createInitialMessage(); - combiner.combine(vertexId, result, new IntWritable(39947466)); - combiner.combine(vertexId, result, new IntWritable(199)); - combiner.combine(vertexId, result, new IntWritable(42)); - combiner.combine(vertexId, result, new IntWritable(19998888)); + IntWritable result = messageCombiner.createInitialMessage(); + messageCombiner.combine(vertexId, result, new IntWritable(39947466)); + messageCombiner.combine(vertexId, result, new IntWritable(199)); + messageCombiner.combine(vertexId, result, new IntWritable(42)); + messageCombiner.combine(vertexId, result, new IntWritable(19998888)); assertEquals(42, result.get()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java index 1323ff6..77bf3f3 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java @@ -18,7 +18,7 @@ package org.apache.giraph.examples; -import org.apache.giraph.combiner.MinimumIntCombiner; +import org.apache.giraph.combiner.MinimumIntMessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.edge.ByteArrayEdges; @@ -72,7 +72,7 @@ public class TryMultiIpcBindingPortsTest { GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.set(conf, true); conf.setComputationClass(ConnectedComponentsComputation.class); conf.setOutEdgesClass(ByteArrayEdges.class); - conf.setCombinerClass(MinimumIntCombiner.class); + conf.setMessageCombinerClass(MinimumIntMessageCombiner.class); conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class); conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java index 4f74fcb..a481db3 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java +++ b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java @@ -18,7 +18,7 @@ package org.apache.giraph.vertex; -import org.apache.giraph.combiner.Combiner; +import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.ByteArrayEdges; @@ -63,8 +63,9 @@ public class TestComputationTypes { /** * Matches the {@link GeneratedComputationMatch} */ - private static class GeneratedVertexMatchCombiner extends - Combiner<LongWritable, FloatWritable> { + private static class GeneratedVertexMatchMessageCombiner + extends + MessageCombiner<LongWritable, FloatWritable> { @Override public void combine(LongWritable vertexIndex, FloatWritable originalMessage, @@ -80,8 +81,9 @@ public class TestComputationTypes { /** * Mismatches the {@link GeneratedComputationMatch} */ - private static class GeneratedVertexMismatchCombiner extends - Combiner<LongWritable, DoubleWritable> { + private static class GeneratedVertexMismatchMessageCombiner + extends + MessageCombiner<LongWritable, DoubleWritable> { @Override public void combine(LongWritable vertexIndex, DoubleWritable originalMessage, @@ -136,8 +138,8 @@ public class TestComputationTypes { GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class); GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf, SimpleSuperstepVertexInputFormat.class); - GiraphConstants.VERTEX_COMBINER_CLASS.set(conf, - GeneratedVertexMatchCombiner.class); + GiraphConstants.MESSAGE_COMBINER_CLASS.set(conf, + GeneratedVertexMatchMessageCombiner.class); @SuppressWarnings("rawtypes") GiraphConfigurationValidator<?, ?, ?, ?, ?> validator = new GiraphConfigurationValidator(conf); @@ -203,8 +205,8 @@ public class TestComputationTypes { GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class); GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf, SimpleSuperstepVertexInputFormat.class); - GiraphConstants.VERTEX_COMBINER_CLASS.set(conf, - GeneratedVertexMismatchCombiner.class); + GiraphConstants.MESSAGE_COMBINER_CLASS.set(conf, + GeneratedVertexMismatchMessageCombiner.class); @SuppressWarnings("rawtypes") GiraphConfigurationValidator<?, ?, ?, ?, ?> validator = new GiraphConfigurationValidator(conf); http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java new file mode 100644 index 0000000..cc61441 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.input.vertex.examples; + +import com.facebook.hiveio.common.HiveType; +import com.facebook.hiveio.input.HiveInputDescription; +import com.facebook.hiveio.record.HiveReadableRecord; +import com.facebook.hiveio.schema.HiveTableSchema; +import com.google.common.base.Preconditions; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.hive.common.HiveParsing; +import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; + +/** + * Simple HiveToVertex that reads vertices with integer IDs, no vertex values, + * and edges with no values. + */ +public class HiveIntIntNullVertex + extends SimpleHiveToVertex<IntWritable, IntWritable, NullWritable> { + @Override public void checkInput(HiveInputDescription inputDesc, + HiveTableSchema schema) { + Preconditions.checkArgument(schema.columnType(0) == HiveType.INT); + Preconditions.checkArgument(schema.columnType(1) == HiveType.LIST); + } + + @Override + public Iterable<Edge<IntWritable, NullWritable>> getEdges( + HiveReadableRecord record) { + return HiveParsing.parseIntNullEdges(record, 1); + } + + @Override + public IntWritable getVertexId(HiveReadableRecord record) { + return HiveParsing.parseIntID(record, 0); + } + + @Override + public IntWritable getVertexValue(HiveReadableRecord record) { + return new IntWritable(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java index 334f382..7ae8bc3 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java @@ -80,7 +80,7 @@ import static org.apache.giraph.conf.GiraphConstants.EDGE_INPUT_FORMAT_CLASS; import static org.apache.giraph.conf.GiraphConstants.GRAPH_TYPE_LANGUAGES; import static org.apache.giraph.conf.GiraphConstants.MAX_WORKERS; import static org.apache.giraph.conf.GiraphConstants.MIN_WORKERS; -import static org.apache.giraph.conf.GiraphConstants.VERTEX_COMBINER_CLASS; +import static org.apache.giraph.conf.GiraphConstants.MESSAGE_COMBINER_CLASS; import static org.apache.giraph.conf.GiraphConstants.VERTEX_INPUT_FORMAT_CLASS; import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS; import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT; @@ -266,8 +266,8 @@ public class HiveJythonUtils { JythonUtils.init(conf, jythonJob.getComputation_name()); - if (jythonJob.getCombiner() != null) { - VERTEX_COMBINER_CLASS.set(conf, jythonJob.getCombiner()); + if (jythonJob.getMessageCombiner() != null) { + MESSAGE_COMBINER_CLASS.set(conf, jythonJob.getMessageCombiner()); } conf.setInt(MIN_WORKERS, jythonJob.getWorkers()); http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java index c75652c..98ba014 100644 --- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java +++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java @@ -17,6 +17,8 @@ */ package org.apache.giraph.hive.input; +import com.facebook.hiveio.common.HiveMetastores; +import com.facebook.hiveio.testing.LocalHiveServer; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.hive.GiraphHiveTestBase; import org.apache.giraph.hive.Helpers; @@ -24,16 +26,13 @@ import org.apache.giraph.hive.computations.ComputationCountEdges; import org.apache.giraph.hive.computations.ComputationSumEdges; import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat; import org.apache.giraph.hive.input.vertex.examples.HiveIntDoubleDoubleVertex; -import org.apache.giraph.hive.input.vertex.examples.HiveIntNullNullVertex; +import org.apache.giraph.hive.input.vertex.examples.HiveIntIntNullVertex; import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; import org.apache.giraph.utils.InternalVertexRunner; import org.apache.thrift.TException; import org.junit.Before; import org.junit.Test; -import com.facebook.hiveio.common.HiveMetastores; -import com.facebook.hiveio.testing.LocalHiveServer; - import java.io.IOException; import java.util.Map; @@ -66,7 +65,7 @@ public class HiveVertexInputTest extends GiraphHiveTestBase { GiraphConfiguration conf = new GiraphConfiguration(); HIVE_VERTEX_INPUT.setTable(conf, tableName); - HIVE_VERTEX_INPUT.setClass(conf, HiveIntNullNullVertex.class); + HIVE_VERTEX_INPUT.setClass(conf, HiveIntIntNullVertex.class); conf.setComputationClass(ComputationCountEdges.class); conf.setVertexInputFormatClass(HiveVertexInputFormat.class); conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); @@ -99,7 +98,7 @@ public class HiveVertexInputTest extends GiraphHiveTestBase { GiraphConfiguration conf = new GiraphConfiguration(); HIVE_VERTEX_INPUT.setTable(conf, tableName); HIVE_VERTEX_INPUT.setPartition(conf, partition); - HIVE_VERTEX_INPUT.setClass(conf, HiveIntNullNullVertex.class); + HIVE_VERTEX_INPUT.setClass(conf, HiveIntIntNullVertex.class); conf.setComputationClass(ComputationCountEdges.class); conf.setVertexInputFormatClass(HiveVertexInputFormat.class); conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py index a87f030..f5cf685 100644 --- a/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py +++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from org.apache.giraph.combiner import DoubleSumCombiner +from org.apache.giraph.combiner import DoubleSumMessageCombiner from org.apache.giraph.edge import ByteArrayEdges from org.apache.giraph.jython import JythonJob from org.apache.hadoop.io import IntWritable http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/src/site/xdoc/quick_start.xml ---------------------------------------------------------------------- diff --git a/src/site/xdoc/quick_start.xml b/src/site/xdoc/quick_start.xml index 5df6555..4c14c75 100644 --- a/src/site/xdoc/quick_start.xml +++ b/src/site/xdoc/quick_start.xml @@ -224,7 +224,7 @@ usage: org.apache.giraph.utils.ConfigurationUtils [-aw <arg>] [-c <arg& <arg>] [-vip <arg>] [-vvf <arg>] [-w <arg>] [-wc <arg>] [-yh <arg>] [-yj <arg>] -aw,--aggregatorWriter <arg> AggregatorWriter class - -c,--combiner <arg> Combiner class + -c,--messageCombiner <arg> Message messageCombiner class -ca,--customArguments <arg> provide custom arguments for the job configuration in the form: -ca <param1>=<value1>,<param2>=<value2>
