Repository: flink
Updated Branches:
  refs/heads/master 0f1775576 -> 233dab497


[FLINK-2561] [gelly] add GraphMetrics Scala example

This closes #1183


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/233dab49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/233dab49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/233dab49

Branch: refs/heads/master
Commit: 233dab497577f0a9443f772bb10390f6dcc005f1
Parents: 9e0284e
Author: vasia <[email protected]>
Authored: Fri Sep 25 11:20:15 2015 +0200
Committer: vasia <[email protected]>
Committed: Tue Sep 29 00:38:20 2015 +0200

----------------------------------------------------------------------
 .../graph/scala/example/GraphMetrics.scala      | 129 +++++++++++++++++++
 .../graph/scala/utils/EdgeToTuple3Map.scala     |   3 +-
 .../graph/scala/utils/Tuple2ToVertexMap.scala   |   3 +-
 .../graph/scala/utils/Tuple3ToEdgeMap.scala     |   3 +-
 .../graph/scala/utils/VertexToTuple2Map.scala   |   3 +-
 .../scala/test/operations/DegreesITCase.scala   |   6 +-
 .../test/operations/GraphMutationsITCase.scala  |  12 +-
 .../test/operations/JoinWithEdgesITCase.scala   |  12 +-
 .../operations/JoinWithVerticesITCase.scala     |   4 +-
 .../scala/test/operations/MapEdgesITCase.scala  |   4 +-
 .../test/operations/MapVerticesITCase.scala     |   4 +-
 .../operations/ReduceOnEdgesMethodsITCase.scala |  10 +-
 .../ReduceOnNeighborMethodsITCase.scala         |   8 +-
 .../flink/graph/example/GraphMetrics.java       |   2 +-
 14 files changed, 164 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
new file mode 100644
index 0000000..68d9285
--- /dev/null
+++ 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.graph.scala.example
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.util.Collector
+
+/**
+ * This example illustrates how to use Gelly metrics methods and get simple 
statistics
+ * from the input graph.  
+ * 
+ * The program creates a random graph and computes and prints
+ * the following metrics:
+ * - number of vertices
+ * - number of edges
+ * - average node degree
+ * - the vertex ids with the max/min in- and out-degrees
+ *
+ * The input file is expected to contain one edge per line,
+ * with long IDs and no values, in the following format:
+ * {{{
+ *   <sourceVertexID>\t<targetVertexID>
+ * }}}
+ * If no arguments are provided, the example runs with a random graph of 100 
vertices.
+ *
+ */
+object GraphMetrics {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    /** create the graph **/
+    val graph: Graph[Long, NullValue, NullValue] = 
Graph.fromDataSet(getEdgeDataSet(env), env)
+
+    /** get the number of vertices **/
+    val numVertices = graph.numberOfVertices;
+
+    /** get the number of edges **/
+    val numEdges = graph.numberOfEdges;
+
+    /** compute the average node degree **/
+    val verticesWithDegrees = graph.getDegrees;
+    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / 
numVertices).toDouble)
+
+    /** find the vertex with the maximum in-degree **/
+    val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
+
+    /** find the vertex with the minimum in-degree **/
+    val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
+
+    /** find the vertex with the maximum out-degree **/
+    val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
+
+    /** find the vertex with the minimum out-degree **/
+    val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
+
+    /** print the results **/
+    env.fromElements(numVertices).printOnTaskManager("Total number of 
vertices")
+    env.fromElements(numEdges).printOnTaskManager("Total number of edges")
+    avgDegree.printOnTaskManager("Average node degree")
+    maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+    minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+    maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+    minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+
+  }
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 1) {
+        edgesPath = args(0)
+        true
+      } else {
+        System.err.println("Usage: GraphMetrics <edges path>")
+        false
+      }
+    } else {
+      System.out.println("Executing GraphMetrics example with built-in default 
data.")
+      System.out.println("  Provide parameters to read input data from a 
file.")
+      System.out.println("  Usage: GraphMetrics <edges path>")
+      true
+    }
+  }
+
+  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
NullValue]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long)](
+          edgesPath,
+          fieldDelimiter = "\t").map(
+          in => new Edge[Long, NullValue](in._1, in._2, 
NullValue.getInstance()))
+    }
+    else {
+      env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
+         (key: Long, out: Collector[Edge[Long, NullValue]]) => {
+         val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
+          for ( i <- 0 to numOutEdges ) {
+            var target: Long = ((Math.random() * numVertices) + 1).toLong
+              new Edge[Long, NullValue](key, target, NullValue.getInstance())
+          }
+        })
+      }
+    }
+
+  private var fileOutput: Boolean = false
+  private var edgesPath: String = null
+  private var outputPath: String = null
+  private val numVertices = 100
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
index 0d7d2af..909dbb4 100644
--- 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
+++ 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
@@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.graph.Edge
 
+@SerialVersionUID(1L)
 class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {
 
-  private val serialVersionUID: Long = 1L
-
   override def map(value: Edge[K, EV]): (K, K, EV) = {
     (value.getSource, value.getTarget, value.getValue)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
index f2b1133..fd6b8c5 100644
--- 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
+++ 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
@@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.graph.Vertex
 
+@SerialVersionUID(1L)
 class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
 
-  private val serialVersionUID: Long = 1L
-
   override def map(value: (K, VV)): Vertex[K, VV] = {
     new Vertex(value._1, value._2)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
index 00cb074..d0e07cc 100644
--- 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
+++ 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
@@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.graph.Edge
 
+@SerialVersionUID(1L)
 class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
 
-  private val serialVersionUID: Long = 1L
-
   override def map(value: (K, K, EV)): Edge[K, EV] = {
     new Edge(value._1, value._2, value._3)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
index de77832..faf4e10 100644
--- 
a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
+++ 
b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
@@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.graph.Vertex
 
+@SerialVersionUID(1L)
 class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {
 
-  private val serialVersionUID: Long = 1L
-
   override def map(value: Vertex[K, VV]): (K, VV) = {
     (value.getId, value.getValue)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
index 6196f99..b347049 100644
--- 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
+++ 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
@@ -40,7 +40,7 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
-    val res = graph.inDegrees().collect.toList
+    val res = graph.inDegrees.collect().toList
     expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
@@ -51,7 +51,7 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
-    val res = graph.outDegrees().collect.toList
+    val res = graph.outDegrees.collect().toList
     expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
@@ -62,7 +62,7 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
-    val res = graph.getDegrees().collect.toList
+    val res = graph.getDegrees.collect().toList
     expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
index 3cb92c4..4b776e2 100644
--- 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
+++ 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
@@ -119,7 +119,7 @@ MultipleProgramsTestBase(mode) {
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L))
-    val res = newgraph.getEdges.collect.toList
+    val res = newgraph.getEdges.collect().toList
     expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + 
"3,5,35\n" + "4,5," +
       "45\n" + "5,1,51\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -146,7 +146,7 @@ MultipleProgramsTestBase(mode) {
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new 
Vertex[Long, Long](1L, 1L),
         new Vertex[Long, Long](6L, 6L)))
-    val res = newgraph.getEdges.collect.toList
+    val res = newgraph.getEdges.collect().toList
     expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
@@ -159,7 +159,7 @@ MultipleProgramsTestBase(mode) {
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new 
Vertex[Long, Long](1L,
       1L), 61L)
-    val res = newgraph.getEdges.collect.toList
+    val res = newgraph.getEdges.collect().toList
     expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + 
"3,5,35\n" + "4,5," +
       "45\n" + "5,1,51\n" + "6,1,61\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -201,7 +201,7 @@ MultipleProgramsTestBase(mode) {
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new 
Vertex[Long, Long](2L,
       2L), 12L)
-    val res = newgraph.getEdges.collect.toList
+    val res = newgraph.getEdges.collect().toList
     expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + 
"3,4,34\n" + "3,5," +
       "35\n" + "4,5,45\n" + "5,1,51\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -214,7 +214,7 @@ MultipleProgramsTestBase(mode) {
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L))
-    val res = newgraph.getEdges.collect.toList
+    val res = newgraph.getEdges.collect().toList
     expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + 
"3,5,35\n" + "4,5,45\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
@@ -226,7 +226,7 @@ MultipleProgramsTestBase(mode) {
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L))
-    val res = newgraph.getEdges.collect.toList
+    val res = newgraph.getEdges.collect().toList
     expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + 
"3,5,35\n" + "4,5," +
       "45\n" + "5,1,51\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
index eae8bd5..3dc90fc 100644
--- 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
+++ 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
@@ -45,7 +45,7 @@ MultipleProgramsTestBase(mode) {
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val result: Graph[Long, Long, Long] = 
graph.joinWithEdges(graph.getEdges.map(new
         EdgeToTuple3Map[Long, Long]), new AddValuesMapper)
-    val res = result.getEdges.collect.toList
+    val res = result.getEdges.collect().toList
     expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + 
"3,5,70\n" + "4,5," +
       "90\n" + "5,1,102\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -60,7 +60,7 @@ MultipleProgramsTestBase(mode) {
     val result: Graph[Long, Long, Long] = 
graph.joinWithEdges(graph.getEdges.map(new
         EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) 
=>
       originalValue + tupleValue)
-    val res = result.getEdges.collect.toList
+    val res = result.getEdges.collect().toList
     expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + 
"3,5,70\n" + "4,5," +
       "90\n" + "5,1,102\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -75,7 +75,7 @@ MultipleProgramsTestBase(mode) {
     val result: Graph[Long, Long, Long] = 
graph.joinWithEdgesOnSource[Long](graph.getEdges
       .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: 
Long) =>
       originalValue + tupleValue)
-    val res = result.getEdges.collect.toList
+    val res = result.getEdges.collect().toList
     expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + 
"3,5,69\n" + "4,5," +
       "90\n" + "5,1,102\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -90,7 +90,7 @@ MultipleProgramsTestBase(mode) {
     val result: Graph[Long, Long, Long] = 
graph.joinWithEdgesOnSource[Long](graph.getEdges
       .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: 
Long) =>
       originalValue + tupleValue)
-    val res = result.getEdges.collect.toList
+    val res = result.getEdges.collect().toList
     expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + 
"3,5,69\n" + "4,5," +
       "90\n" + "5,1,102\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -105,7 +105,7 @@ MultipleProgramsTestBase(mode) {
     val result: Graph[Long, Long, Long] = 
graph.joinWithEdgesOnTarget[Long](graph.getEdges
       .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: 
Long) =>
       originalValue + tupleValue)
-    val res = result.getEdges.collect.toList
+    val res = result.getEdges.collect().toList
     expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + 
"3,5,70\n" + "4,5," +
       "80\n" + "5,1,102\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -120,7 +120,7 @@ MultipleProgramsTestBase(mode) {
     val result: Graph[Long, Long, Long] = 
graph.joinWithEdgesOnTarget[Long](graph.getEdges
       .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: 
Long) =>
       originalValue + tupleValue)
-    val res = result.getEdges.collect.toList
+    val res = result.getEdges.collect().toList
     expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + 
"3,5,70\n" + "4,5," +
       "80\n" + "5,1,102\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
index 8d18d58..98ee8b6 100644
--- 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
+++ 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -44,7 +44,7 @@ MultipleProgramsTestBase(mode) {
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val result: Graph[Long, Long, Long] = 
graph.joinWithVertices(graph.getVertices.map(new
         VertexToTuple2Map[Long, Long]), new AddValuesMapper)
-    val res = result.getVertices.collect.toList
+    val res = result.getVertices.collect().toList
     expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
@@ -58,7 +58,7 @@ MultipleProgramsTestBase(mode) {
     val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long])
     val result: Graph[Long, Long, Long] = 
graph.joinWithVertices[Long](tupleSet,
       (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue)
-    val res = result.getVertices.collect.toList
+    val res = result.getVertices.collect().toList
     expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
index 0fa8d2b..bdfd569 100644
--- 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
+++ 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
@@ -42,7 +42,7 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
-    val res = graph.mapEdges(new AddOneMapper).getEdges.collect.toList
+    val res = graph.mapEdges(new AddOneMapper).getEdges.collect().toList
     expectedResult = "1,2,13\n" +
       "1,3,14\n" + "" +
       "2,3,24\n" +
@@ -60,7 +60,7 @@ MultipleProgramsTestBase(mode) {
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val res = graph.mapEdges(edge => edge.getValue + 1)
-      .getEdges.collect.toList
+      .getEdges.collect().toList
     expectedResult = "1,2,13\n" +
       "1,3,14\n" + "" +
       "2,3,24\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
index c1ab3ea..2e51d90 100644
--- 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
+++ 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
@@ -42,7 +42,7 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
-    val res = graph.mapVertices(new AddOneMapper).getVertices.collect.toList
+    val res = graph.mapVertices(new AddOneMapper).getVertices.collect().toList
     expectedResult = "1,2\n" +
       "2,3\n" +
       "3,4\n" +
@@ -57,7 +57,7 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
-    val res = graph.mapVertices(vertex => vertex.getValue + 
1).getVertices.collect.toList
+    val res = graph.mapVertices(vertex => vertex.getValue + 
1).getVertices.collect().toList
     expectedResult = "1,2\n" +
       "2,3\n" +
       "3,4\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
index 695f74a..dcd1deb 100644
--- 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
+++ 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
@@ -43,7 +43,7 @@ class ReduceOnEdgesMethodsITCase(mode: 
MultipleProgramsTestBase.TestExecutionMod
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
-      EdgeDirection.ALL).collect.toList
+      EdgeDirection.ALL).collect().toList
     expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
@@ -56,7 +56,7 @@ class ReduceOnEdgesMethodsITCase(mode: 
MultipleProgramsTestBase.TestExecutionMod
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
-    .collect.toList
+    .collect().toList
     expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" 
+
       "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" +
       "(5,1)\n" + "(5,3)\n" + "(5,4)"
@@ -71,7 +71,7 @@ class ReduceOnEdgesMethodsITCase(mode: 
MultipleProgramsTestBase.TestExecutionMod
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = 
graph.reduceOnEdges(new
         SelectMinWeightNeighborNoValue, EdgeDirection.OUT)
-    val res = verticesWithLowestOutNeighbor.collect.toList
+    val res = verticesWithLowestOutNeighbor.collect().toList
     expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + 
"(5,51)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
@@ -84,7 +84,7 @@ class ReduceOnEdgesMethodsITCase(mode: 
MultipleProgramsTestBase.TestExecutionMod
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = 
graph.reduceOnEdges(new
         SelectMinWeightNeighborNoValue, EdgeDirection.IN)
-    val res = verticesWithLowestOutNeighbor.collect.toList
+    val res = verticesWithLowestOutNeighbor.collect().toList
     expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + 
"(5,35)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
@@ -97,7 +97,7 @@ class ReduceOnEdgesMethodsITCase(mode: 
MultipleProgramsTestBase.TestExecutionMod
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = 
graph.reduceOnEdges(new
         SelectMaxWeightNeighborNoValue, EdgeDirection.ALL)
-    val res = verticesWithMaxEdgeWeight.collect.toList
+    val res = verticesWithMaxEdgeWeight.collect().toList
     expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + 
"(5,51)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
index b01e750..aef5493 100644
--- 
a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
+++ 
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
@@ -43,7 +43,7 @@ class ReduceOnNeighborMethodsITCase(mode: 
MultipleProgramsTestBase.TestExecution
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL)
-    .collect.toList
+    .collect().toList
     expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + 
"(5,8)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
@@ -54,7 +54,7 @@ class ReduceOnNeighborMethodsITCase(mode: 
MultipleProgramsTestBase.TestExecution
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
-    val res = graph.reduceOnNeighbors(new SumNeighbors, 
EdgeDirection.OUT).collect.toList
+    val res = graph.reduceOnNeighbors(new SumNeighbors, 
EdgeDirection.OUT).collect().toList
     expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
@@ -66,7 +66,7 @@ class ReduceOnNeighborMethodsITCase(mode: 
MultipleProgramsTestBase.TestExecution
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, 
EdgeDirection.ALL)
-    val res = result.collect.toList
+    val res = result.collect().toList
     expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + 
"(5,13)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
@@ -79,7 +79,7 @@ class ReduceOnNeighborMethodsITCase(mode: 
MultipleProgramsTestBase.TestExecution
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
     val result = graph.groupReduceOnNeighbors(new
         SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN)
-    val res = result.collect.toList
+    val res = result.collect().toList
     expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + 
"(5,570)\n" + "(5,285)"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 591ed26..b808e76 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.example.utils.ExampleUtils;
 import org.apache.flink.types.NullValue;
 
 /**
- * This example illustrate how to use Gelly metrics methods and get simple 
statistics
+ * This example illustrates how to use Gelly metrics methods and get simple 
statistics
  * from the input graph.  
  * 
  * The program creates a random graph and computes and prints

Reply via email to