[FLINK-1201] [gelly] reduceOnNeighbors with vertex value
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b2c96d9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b2c96d9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b2c96d9 Branch: refs/heads/master Commit: 4b2c96d9cc87237b60e92855848d822e411e4472 Parents: b5b63d7 Author: vasia <vasilikikala...@gmail.com> Authored: Sun Dec 21 15:46:40 2014 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:13 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 130 +++++++++ .../apache/flink/graph/NeighborsFunction.java | 13 + .../graph/NeighborsFunctionWithVertexValue.java | 12 + .../flink/graph/test/TestNeighborMethods.java | 290 ------------------- .../graph/test/TestReduceOnEdgesMethods.java | 290 +++++++++++++++++++ .../graph/test/TestReduceOnNeighborMethods.java | 157 ++++++++++ 6 files changed, 602 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 48b45de..f2f62e3 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -41,6 +41,7 @@ import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -451,6 +452,15 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } } + private static final class EmitOneEdgeWithNeighborPerNode<K extends Comparable<K> & Serializable, + VV extends Serializable, EV extends Serializable> implements FlatMapFunction< + Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> { + public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) { + out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge)); + out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge)); + } + } + private static final class ApplyCoGroupFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, T>>, @@ -978,4 +988,124 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return algorithm.run(this); } + /** + * Compute an aggregate over the neighbors (edges and vertices) of each vertex. + * The function applied on the neighbors has access to the vertex value. + * @param neighborsFunction the function to apply to the neighborhood + * @param direction the edge direction (in-, out-, all-) + * @return a dataset of a Tuple2 with the vertex id and the computed value + * @throws IllegalArgumentException + */ + public <T> DataSet<Tuple2<K, T>> reduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction, + EdgeDirection direction) throws IllegalArgumentException { + switch (direction) { + case IN: + // create <edge-sourceVertex> pairs + DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges.join(this.vertices) + .where(0).equalTo(0); + return vertices.coGroup(edgesWithSources).where(0).equalTo("f0.f1").with( + new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)); + case OUT: + // create <edge-targetVertex> pairs + DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges.join(this.vertices) + .where(1).equalTo(0); + return vertices.coGroup(edgesWithTargets).where(0).equalTo("f0.f0").with( + new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)); + case ALL: + // create <edge-sourceOrTargetVertex> pairs + DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges.flatMap( + new EmitOneEdgeWithNeighborPerNode<K, VV, EV>()).join(this.vertices) + .where(1).equalTo(0).with(new ProjectEdgeWithNeighbor<K, VV, EV>()); + + return vertices.coGroup(edgesWithNeighbors).where(0).equalTo(0) + .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)); + default: + throw new IllegalArgumentException("Illegal edge direction"); + } + } + + private static final class ProjectEdgeWithNeighbor<K extends Comparable<K> & Serializable, + VV extends Serializable, EV extends Serializable> implements + FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> { + public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor, + Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) { + out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, + keysWithEdge.f2, neighbor)); + } + } + + private static final class ApplyNeighborCoGroupFunction<K extends Comparable<K> & Serializable, + VV extends Serializable, EV extends Serializable, T> + implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, Tuple2<K, T>>, + ResultTypeQueryable<Tuple2<K, T>> { + + private NeighborsFunctionWithVertexValue<K, VV, EV, T> function; + + public ApplyNeighborCoGroupFunction (NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) { + this.function = fun; + } + public void coGroup(Iterable<Vertex<K, VV>> vertex, + Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<Tuple2<K, T>> out) throws Exception { + out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors)); + } + @Override + public TypeInformation<Tuple2<K, T>> getProducedType() { + return new TupleTypeInfo<Tuple2<K, T>>(keyType, + TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, + function.getClass(), 3, null, null)); + } + } + + private static final class ApplyCoGroupFunctionOnAllNeighbors<K extends Comparable<K> & Serializable, + VV extends Serializable, EV extends Serializable, T> + implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, Tuple2<K, T>>, + ResultTypeQueryable<Tuple2<K, T>> { + + private NeighborsFunctionWithVertexValue<K, VV, EV, T> function; + + public ApplyCoGroupFunctionOnAllNeighbors (NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) { + this.function = fun; + } + + public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors, + Collector<Tuple2<K, T>> out) throws Exception { + + final Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() { + + final Iterator<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithEdgesIterator = + keysWithNeighbors.iterator(); + + @Override + public boolean hasNext() { + return keysWithEdgesIterator.hasNext(); + } + + @Override + public Tuple2<Edge<K, EV>, Vertex<K, VV>> next() { + Tuple3<K, Edge<K, EV>, Vertex<K, VV>> next = keysWithEdgesIterator.next(); + return new Tuple2<Edge<K, EV>, Vertex<K, VV>>(next.f1, next.f2); + } + + @Override + public void remove() { + keysWithEdgesIterator.remove(); + } + }; + + Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterable = new Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() { + public Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> iterator() { + return neighborsIterator; + } + }; + + out.collect(function.iterateNeighbors(vertex.iterator().next(), neighborsIterable)); + } + + @Override + public TypeInformation<Tuple2<K, T>> getProducedType() { + return new TupleTypeInfo<Tuple2<K, T>>(keyType, + TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, + function.getClass(), 3, null, null)); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java new file mode 100644 index 0000000..5158078 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java @@ -0,0 +1,13 @@ +package flink.graphs; + +import java.io.Serializable; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; + +public interface NeighborsFunction<K extends Comparable<K> & Serializable, VV extends Serializable, + EV extends Serializable, O> extends Function, Serializable { + + Tuple2<K, O> iterateEdges(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java new file mode 100644 index 0000000..ae7ef2e --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java @@ -0,0 +1,12 @@ +package flink.graphs; + +import java.io.Serializable; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.java.tuple.Tuple2; + +public interface NeighborsFunctionWithVertexValue<K extends Comparable<K> & Serializable, VV extends Serializable, + EV extends Serializable, O> extends Function, Serializable { + + Tuple2<K, O> iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestNeighborMethods.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestNeighborMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestNeighborMethods.java deleted file mode 100644 index 6a97767..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestNeighborMethods.java +++ /dev/null @@ -1,290 +0,0 @@ -package flink.graphs; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.JavaProgramTestBase; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -@RunWith(Parameterized.class) -public class TestNeighborMethods extends JavaProgramTestBase { - - private static int NUM_PROGRAMS = 6; - - private int curProgId = config.getInteger("ProgramId", -1); - private String resultPath; - private String expectedResult; - - public TestNeighborMethods(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void testProgram() throws Exception { - expectedResult = GraphProgs.runProgram(curProgId, resultPath); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); - } - - private static class GraphProgs { - - @SuppressWarnings("serial") - public static String runProgram(int progId, String resultPath) throws Exception { - - switch(progId) { - case 1: { - /* - * Get the lowest-weight out-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Long>() { - - public Tuple2<Long, Long> iterateEdges( - Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - - for (Edge<Long, Long> edge: edges) { - if (edge.getValue() < weight) { - weight = edge.getValue(); - minNeighorId = edge.getTarget(); - } - } - return new Tuple2<Long, Long>(v.getId(), minNeighorId); - } - }, EdgeDirection.OUT); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - return "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; - } - case 2: { - /* - * Get the lowest-weight in-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Long>() { - - public Tuple2<Long, Long> iterateEdges( - Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - - for (Edge<Long, Long> edge: edges) { - if (edge.getValue() < weight) { - weight = edge.getValue(); - minNeighorId = edge.getSource(); - } - } - return new Tuple2<Long, Long>(v.getId(), minNeighorId); - } - }, EdgeDirection.IN); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - return "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; - } - case 3: { - /* - * Get the maximum weight among all edges - * of a vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Long>() { - - public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges) { - - long weight = Long.MIN_VALUE; - - for (Edge<Long, Long> edge: edges) { - if (edge.getValue() > weight) { - weight = edge.getValue(); - } - } - return new Tuple2<Long, Long>(v.getId(), weight); - } - }, EdgeDirection.ALL); - verticesWithMaxEdgeWeight.writeAsCsv(resultPath); - env.execute(); - return "1,51\n" + - "2,23\n" + - "3,35\n" + - "4,45\n" + - "5,51\n"; - } - case 4: { - /* - * Get the lowest-weight out-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunction<Long, Long, Long>() { - - public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2<Long, Edge<Long, Long>> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighorId = edge.f1.getTarget(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2<Long, Long>(vertexId, minNeighorId); - } - }, EdgeDirection.OUT); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - return "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; - } - case 5: { - /* - * Get the lowest-weight in-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunction<Long, Long, Long>() { - - public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2<Long, Edge<Long, Long>> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighorId = edge.f1.getSource(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2<Long, Long>(vertexId, minNeighorId); - } - }, EdgeDirection.IN); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - return "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; - } - case 6: { - /* - * Get the maximum weight among all edges - * of a vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new EdgesFunction<Long, Long, Long>() { - - public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { - - long weight = Long.MIN_VALUE; - long vertexId = -1; - long i=0; - - for (Tuple2<Long, Edge<Long, Long>> edge: edges) { - if (edge.f1.getValue() > weight) { - weight = edge.f1.getValue(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2<Long, Long>(vertexId, weight); - } - }, EdgeDirection.ALL); - verticesWithMaxEdgeWeight.writeAsCsv(resultPath); - env.execute(); - return "1,51\n" + - "2,23\n" + - "3,35\n" + - "4,45\n" + - "5,51\n"; - } - default: - throw new IllegalArgumentException("Invalid program id"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java new file mode 100644 index 0000000..1fdccfa --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java @@ -0,0 +1,290 @@ +package flink.graphs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TestReduceOnEdgesMethods extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 6; + + private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; + private String expectedResult; + + public TestReduceOnEdgesMethods(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + expectedResult = GraphProgs.runProgram(curProgId, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Parameters + public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + private static class GraphProgs { + + @SuppressWarnings("serial") + public static String runProgram(int progId, String resultPath) throws Exception { + + switch(progId) { + case 1: { + /* + * Get the lowest-weight out-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Long>() { + + public Tuple2<Long, Long> iterateEdges( + Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() < weight) { + weight = edge.getValue(); + minNeighorId = edge.getTarget(); + } + } + return new Tuple2<Long, Long>(v.getId(), minNeighorId); + } + }, EdgeDirection.OUT); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + return "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,1\n"; + } + case 2: { + /* + * Get the lowest-weight in-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Long>() { + + public Tuple2<Long, Long> iterateEdges( + Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() < weight) { + weight = edge.getValue(); + minNeighorId = edge.getSource(); + } + } + return new Tuple2<Long, Long>(v.getId(), minNeighorId); + } + }, EdgeDirection.IN); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + return "1,5\n" + + "2,1\n" + + "3,1\n" + + "4,3\n" + + "5,3\n"; + } + case 3: { + /* + * Get the maximum weight among all edges + * of a vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = + graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Long>() { + + public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MIN_VALUE; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() > weight) { + weight = edge.getValue(); + } + } + return new Tuple2<Long, Long>(v.getId(), weight); + } + }, EdgeDirection.ALL); + verticesWithMaxEdgeWeight.writeAsCsv(resultPath); + env.execute(); + return "1,51\n" + + "2,23\n" + + "3,35\n" + + "4,45\n" + + "5,51\n"; + } + case 4: { + /* + * Get the lowest-weight out-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new EdgesFunction<Long, Long, Long>() { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() < weight) { + weight = edge.f1.getValue(); + minNeighorId = edge.f1.getTarget(); + } + if (i==0) { + vertexId = edge.f0; + } i++; + } + return new Tuple2<Long, Long>(vertexId, minNeighorId); + } + }, EdgeDirection.OUT); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + return "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,1\n"; + } + case 5: { + /* + * Get the lowest-weight in-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new EdgesFunction<Long, Long, Long>() { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() < weight) { + weight = edge.f1.getValue(); + minNeighorId = edge.f1.getSource(); + } + if (i==0) { + vertexId = edge.f0; + } i++; + } + return new Tuple2<Long, Long>(vertexId, minNeighorId); + } + }, EdgeDirection.IN); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + return "1,5\n" + + "2,1\n" + + "3,1\n" + + "4,3\n" + + "5,3\n"; + } + case 6: { + /* + * Get the maximum weight among all edges + * of a vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = + graph.reduceOnEdges(new EdgesFunction<Long, Long, Long>() { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MIN_VALUE; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() > weight) { + weight = edge.f1.getValue(); + } + if (i==0) { + vertexId = edge.f0; + } i++; + } + return new Tuple2<Long, Long>(vertexId, weight); + } + }, EdgeDirection.ALL); + verticesWithMaxEdgeWeight.writeAsCsv(resultPath); + env.execute(); + return "1,51\n" + + "2,23\n" + + "3,35\n" + + "4,45\n" + + "5,51\n"; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java new file mode 100644 index 0000000..b8927aa --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java @@ -0,0 +1,157 @@ +package flink.graphs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TestReduceOnNeighborMethods extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 3; + + private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; + private String expectedResult; + + public TestReduceOnNeighborMethods(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + expectedResult = GraphProgs.runProgram(curProgId, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Parameters + public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + private static class GraphProgs { + + @SuppressWarnings("serial") + public static String runProgram(int progId, String resultPath) throws Exception { + + switch(progId) { + case 1: { + /* + * Get the sum of out-neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() { + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + return new Tuple2<Long, Long>(vertex.getId(), sum); + } + }, EdgeDirection.OUT); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + return "1,5\n" + + "2,3\n" + + "3,9\n" + + "4,5\n" + + "5,1\n"; + } + case 2: { + /* + * Get the sum of in-neighbor values + * time edge weights for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSum = + graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() { + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f0.getValue() * neighbor.f1.getValue(); + } + return new Tuple2<Long, Long>(vertex.getId(), sum); + } + }, EdgeDirection.IN); + + verticesWithSum.writeAsCsv(resultPath); + env.execute(); + return "1,255\n" + + "2,12\n" + + "3,59\n" + + "4,102\n" + + "5,285\n"; + } + case 3: { + /* + * Get the sum of all neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() { + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + return new Tuple2<Long, Long>(vertex.getId(), sum); + } + }, EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + return "1,10\n" + + "2,4\n" + + "3,12\n" + + "4,8\n" + + "5,8\n"; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + } + } +} \ No newline at end of file