[ https://issues.apache.org/jira/browse/FLINK-2561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14947109#comment-14947109 ]
ASF GitHub Bot commented on FLINK-2561: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1211#discussion_r41410322 --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.example; + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.Edge +import org.apache.flink.types.NullValue +import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.library.GSAConnectedComponents +import java.lang.Long + +/** + * This example shows how to use Gelly's library methods. + * You can find all available library methods in [[org.apache.flink.graph.library]]. + * + * In particular, this example uses the + * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]] + * library method to compute the connected components of the input graph. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\n1\t3\n</code> defines two edges, + * 1-2 and 1-3. + * + * Usage {{ + * ConnectedComponents <edge path> <result path> <number of iterations> + * }} + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]] + */ +object ConnectedComponents { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env) + val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env) + + val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations)) + + + // emit result + if (fileOutput) { + components.writeAsCsv(outputPath, "\n", ",") + env.execute("Connected Components Example") + } else { + components.print() + } + } + + private final class InitVertices extends MapFunction[Long, Long] { + override def map(id: Long) = {id} + } + + // *********************************************************************** + // UTIL METHODS + // *********************************************************************** + + private var fileOutput = false + private var edgesInputPath: String = null + private var outputPath: String = null + private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS + + private def parseParameters(args: Array[String]): Boolean = { + if(args.length > 0) { + if(args.length != 3) { + System.err.println("Usage ConnectedComponents <edge path> <output path> " + + "<num iterations>") + false + } + fileOutput = true + edgesInputPath = args(0) + outputPath = args(1) + maxIterations = (2).toInt + } else { + System.out.println("Executing ConnectedComponents example with default parameters" + + " and built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println("Usage ConnectedComponents <edge path> <output path> " + + "<num iterations>"); + } + true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long)](edgesInputPath, + lineDelimiter = "\n", + fieldDelimiter = "\t") + .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance)) + } else { + val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map { + case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) --- End diff -- indent this? > Sync Gelly Java and Scala APIs > ------------------------------ > > Key: FLINK-2561 > URL: https://issues.apache.org/jira/browse/FLINK-2561 > Project: Flink > Issue Type: Task > Components: Gelly > Reporter: Vasia Kalavri > Assignee: Vasia Kalavri > Fix For: 0.10 > > > There is some functionality and tests missing from the Gelly Scala API. This > should be added, together with documentation, a completeness test and some > usage examples. -- This message was sent by Atlassian JIRA (v6.3.4#6332)