Repository: flink Updated Branches: refs/heads/master 0f1775576 -> 233dab497
[FLINK-2561] [gelly] add GraphMetrics Scala example This closes #1183 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/233dab49 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/233dab49 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/233dab49 Branch: refs/heads/master Commit: 233dab497577f0a9443f772bb10390f6dcc005f1 Parents: 9e0284e Author: vasia <[email protected]> Authored: Fri Sep 25 11:20:15 2015 +0200 Committer: vasia <[email protected]> Committed: Tue Sep 29 00:38:20 2015 +0200 ---------------------------------------------------------------------- .../graph/scala/example/GraphMetrics.scala | 129 +++++++++++++++++++ .../graph/scala/utils/EdgeToTuple3Map.scala | 3 +- .../graph/scala/utils/Tuple2ToVertexMap.scala | 3 +- .../graph/scala/utils/Tuple3ToEdgeMap.scala | 3 +- .../graph/scala/utils/VertexToTuple2Map.scala | 3 +- .../scala/test/operations/DegreesITCase.scala | 6 +- .../test/operations/GraphMutationsITCase.scala | 12 +- .../test/operations/JoinWithEdgesITCase.scala | 12 +- .../operations/JoinWithVerticesITCase.scala | 4 +- .../scala/test/operations/MapEdgesITCase.scala | 4 +- .../test/operations/MapVerticesITCase.scala | 4 +- .../operations/ReduceOnEdgesMethodsITCase.scala | 10 +- .../ReduceOnNeighborMethodsITCase.scala | 8 +- .../flink/graph/example/GraphMetrics.java | 2 +- 14 files changed, 164 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala new file mode 100644 index 0000000..68d9285 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.graph.scala.example + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.types.NullValue +import org.apache.flink.graph.Edge +import org.apache.flink.util.Collector + +/** + * This example illustrates how to use Gelly metrics methods and get simple statistics + * from the input graph. + * + * The program creates a random graph and computes and prints + * the following metrics: + * - number of vertices + * - number of edges + * - average node degree + * - the vertex ids with the max/min in- and out-degrees + * + * The input file is expected to contain one edge per line, + * with long IDs and no values, in the following format: + * {{{ + * <sourceVertexID>\t<targetVertexID> + * }}} + * If no arguments are provided, the example runs with a random graph of 100 vertices. + * + */ +object GraphMetrics { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + /** create the graph **/ + val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env) + + /** get the number of vertices **/ + val numVertices = graph.numberOfVertices; + + /** get the number of edges **/ + val numEdges = graph.numberOfEdges; + + /** compute the average node degree **/ + val verticesWithDegrees = graph.getDegrees; + val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble) + + /** find the vertex with the maximum in-degree **/ + val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1) + + /** find the vertex with the minimum in-degree **/ + val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1) + + /** find the vertex with the maximum out-degree **/ + val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1) + + /** find the vertex with the minimum out-degree **/ + val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1) + + /** print the results **/ + env.fromElements(numVertices).printOnTaskManager("Total number of vertices") + env.fromElements(numEdges).printOnTaskManager("Total number of edges") + avgDegree.printOnTaskManager("Average node degree") + maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree") + minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree") + maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree") + minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree") + + } + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 1) { + edgesPath = args(0) + true + } else { + System.err.println("Usage: GraphMetrics <edges path>") + false + } + } else { + System.out.println("Executing GraphMetrics example with built-in default data.") + System.out.println(" Provide parameters to read input data from a file.") + System.out.println(" Usage: GraphMetrics <edges path>") + true + } + } + + private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long)]( + edgesPath, + fieldDelimiter = "\t").map( + in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance())) + } + else { + env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]]( + (key: Long, out: Collector[Edge[Long, NullValue]]) => { + val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt + for ( i <- 0 to numOutEdges ) { + var target: Long = ((Math.random() * numVertices) + 1).toLong + new Edge[Long, NullValue](key, target, NullValue.getInstance()) + } + }) + } + } + + private var fileOutput: Boolean = false + private var edgesPath: String = null + private var outputPath: String = null + private val numVertices = 100 +} http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala index 0d7d2af..909dbb4 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala @@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.graph.Edge +@SerialVersionUID(1L) class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] { - private val serialVersionUID: Long = 1L - override def map(value: Edge[K, EV]): (K, K, EV) = { (value.getSource, value.getTarget, value.getValue) } http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala index f2b1133..fd6b8c5 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala @@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.graph.Vertex +@SerialVersionUID(1L) class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] { - private val serialVersionUID: Long = 1L - override def map(value: (K, VV)): Vertex[K, VV] = { new Vertex(value._1, value._2) } http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala index 00cb074..d0e07cc 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala @@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.graph.Edge +@SerialVersionUID(1L) class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] { - private val serialVersionUID: Long = 1L - override def map(value: (K, K, EV)): Edge[K, EV] = { new Edge(value._1, value._2, value._3) } http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala index de77832..faf4e10 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala @@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.graph.Vertex +@SerialVersionUID(1L) class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] { - private val serialVersionUID: Long = 1L - override def map(value: Vertex[K, VV]): (K, VV) = { (value.getId, value.getValue) } http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala index 6196f99..b347049 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala @@ -40,7 +40,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.inDegrees().collect.toList + val res = graph.inDegrees.collect().toList expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -51,7 +51,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.outDegrees().collect.toList + val res = graph.outDegrees.collect().toList expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -62,7 +62,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.getDegrees().collect.toList + val res = graph.getDegrees.collect().toList expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala index 3cb92c4..4b776e2 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala @@ -119,7 +119,7 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L)) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -146,7 +146,7 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](6L, 6L))) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @@ -159,7 +159,7 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L, 1L), 61L) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" + "6,1,61\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -201,7 +201,7 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L, 2L), 12L) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," + "35\n" + "4,5,45\n" + "5,1,51\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -214,7 +214,7 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L)) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @@ -226,7 +226,7 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L)) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala index eae8bd5..3dc90fc 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala @@ -45,7 +45,7 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new EdgeToTuple3Map[Long, Long]), new AddValuesMapper) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "90\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -60,7 +60,7 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "90\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -75,7 +75,7 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + "90\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -90,7 +90,7 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + "90\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -105,7 +105,7 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "80\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -120,7 +120,7 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "80\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala index 8d18d58..98ee8b6 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala @@ -44,7 +44,7 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new VertexToTuple2Map[Long, Long]), new AddValuesMapper) - val res = result.getVertices.collect.toList + val res = result.getVertices.collect().toList expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @@ -58,7 +58,7 @@ MultipleProgramsTestBase(mode) { val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long]) val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet, (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue) - val res = result.getVertices.collect.toList + val res = result.getVertices.collect().toList expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala index 0fa8d2b..bdfd569 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala @@ -42,7 +42,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.mapEdges(new AddOneMapper).getEdges.collect.toList + val res = graph.mapEdges(new AddOneMapper).getEdges.collect().toList expectedResult = "1,2,13\n" + "1,3,14\n" + "" + "2,3,24\n" + @@ -60,7 +60,7 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.mapEdges(edge => edge.getValue + 1) - .getEdges.collect.toList + .getEdges.collect().toList expectedResult = "1,2,13\n" + "1,3,14\n" + "" + "2,3,24\n" + http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala index c1ab3ea..2e51d90 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala @@ -42,7 +42,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.mapVertices(new AddOneMapper).getVertices.collect.toList + val res = graph.mapVertices(new AddOneMapper).getVertices.collect().toList expectedResult = "1,2\n" + "2,3\n" + "3,4\n" + @@ -57,7 +57,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect.toList + val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect().toList expectedResult = "1,2\n" + "2,3\n" + "3,4\n" + http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala index 695f74a..dcd1deb 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala @@ -43,7 +43,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour, - EdgeDirection.ALL).collect.toList + EdgeDirection.ALL).collect().toList expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -56,7 +56,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL) - .collect.toList + .collect().toList expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" + "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" + "(5,1)\n" + "(5,3)\n" + "(5,4)" @@ -71,7 +71,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMinWeightNeighborNoValue, EdgeDirection.OUT) - val res = verticesWithLowestOutNeighbor.collect.toList + val res = verticesWithLowestOutNeighbor.collect().toList expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -84,7 +84,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMinWeightNeighborNoValue, EdgeDirection.IN) - val res = verticesWithLowestOutNeighbor.collect.toList + val res = verticesWithLowestOutNeighbor.collect().toList expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -97,7 +97,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue, EdgeDirection.ALL) - val res = verticesWithMaxEdgeWeight.collect.toList + val res = verticesWithMaxEdgeWeight.collect().toList expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala index b01e750..aef5493 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala @@ -43,7 +43,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL) - .collect.toList + .collect().toList expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -54,7 +54,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect.toList + val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect().toList expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -66,7 +66,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL) - val res = result.collect.toList + val res = result.collect().toList expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -79,7 +79,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result = graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN) - val res = result.collect.toList + val res = result.collect().toList expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" + "(5,285)" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java index 591ed26..b808e76 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java @@ -30,7 +30,7 @@ import org.apache.flink.graph.example.utils.ExampleUtils; import org.apache.flink.types.NullValue; /** - * This example illustrate how to use Gelly metrics methods and get simple statistics + * This example illustrates how to use Gelly metrics methods and get simple statistics * from the input graph. * * The program creates a random graph and computes and prints
