[ https://issues.apache.org/jira/browse/FLINK-2561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14947128#comment-14947128 ]
ASF GitHub Bot commented on FLINK-2561: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1211#discussion_r41411193 --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala --- @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.example; + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.types.NullValue +import org.apache.flink.graph.Edge +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.spargel.VertexUpdateFunction +import org.apache.flink.graph.spargel.MessageIterator +import org.apache.flink.graph.Vertex +import org.apache.flink.graph.spargel.MessagingFunction +import scala.collection.JavaConversions._ +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData + +/** + * This example shows how to use Gelly's vertex-centric iterations. + * + * It is an implementation of the Single-Source-Shortest-Paths algorithm. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. + * + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]] + */ +object SingleSourceShortestPaths { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env) + val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env) + + // Execute the vertex-centric iteration + val result = graph.runVertexCentricIteration(new VertexDistanceUpdater, + new MinDistanceMessenger, maxIterations) + + // Extract the vertices as the result + val singleSourceShortestPaths = result.getVertices + + // emit result + if (fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",") + env.execute("Single Source Shortest Paths Example") + } else { + singleSourceShortestPaths.print() + } + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] { + + override def map(id: Long) = { + if (id.equals(srcId)) { + 0.0 + } else { + Double.PositiveInfinity + } + } + } + + /** + * Function that updates the value of a vertex by picking the minimum + * distance from all incoming messages. + */ + private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] { + + override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) { + var minDistance = Double.MaxValue + while (inMessages.hasNext) { + var msg = inMessages.next + if (msg < minDistance) { + minDistance = msg + } + } + if (vertex.getValue > minDistance) { + setNewVertexValue(minDistance) + } + } + } + + /** + * Distributes the minimum distance associated with a given vertex among all + * the target vertices summed up with the edge's value. + */ + private final class MinDistanceMessenger extends + MessagingFunction[Long, Double, Double, Double] { + + override def sendMessages(vertex: Vertex[Long, Double]) { + for (edge: Edge[Long, Double] <- getEdges) { + sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue) + } + } + } + + // **************************************************************************** + // UTIL METHODS + // **************************************************************************** + + private var fileOutput = false + private var srcVertexId = 1L + private var edgesInputPath: String = null + private var outputPath: String = null + private var maxIterations = 5 + + private def parseParameters(args: Array[String]): Boolean = { + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>") + false + } + fileOutput = true --- End diff -- indention +2 spaces? > Sync Gelly Java and Scala APIs > ------------------------------ > > Key: FLINK-2561 > URL: https://issues.apache.org/jira/browse/FLINK-2561 > Project: Flink > Issue Type: Task > Components: Gelly > Reporter: Vasia Kalavri > Assignee: Vasia Kalavri > Fix For: 0.10 > > > There is some functionality and tests missing from the Gelly Scala API. This > should be added, together with documentation, a completeness test and some > usage examples. -- This message was sent by Atlassian JIRA (v6.3.4#6332)