[ https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14590690#comment-14590690 ]
ASF GitHub Bot commented on FLINK-1520: --------------------------------------- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32679424 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +//-------------------------------------------------------------------------------------------------------------------- + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVertexReader() + { + return this.VertexReader; + } + //-------------------------------------------------------------------------------------------------------------------- + + /** + * Specifies the types for the Graph fields and returns a Graph with those field types + * + * This method is overloaded for the case in which Vertices don't have a value + * + * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data and the type of Vetex ID in the returned Graph. + * @param type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph. + * @param type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. + * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data. + */ + public <K,VV,EV> Graph<K,VV,EV> types(Class<K> type0, Class<VV> type1, Class<EV> type2) + { + DataSet<Tuple3<K,K,EV>> edges = this.EdgeReader.types(type0,type0,type2); + if(path1!=null) + { + DataSet<Tuple2<K,VV>> vertices = this.VertexReader.types(type0,type1); + return Graph.fromTupleDataSet(vertices,edges,executionContext); + } + else + { + return Graph.fromTupleDataSet(edges,mapper,executionContext); --- End diff -- Yes, you need to define K and V in the class definition if you want to use them here, like `GraphCsvReader<K, VV, EV>`. If you find that the types are not needed, you can simply suppress the warning. > Read edges and vertices from CSV files > -------------------------------------- > > Key: FLINK-1520 > URL: https://issues.apache.org/jira/browse/FLINK-1520 > Project: Flink > Issue Type: New Feature > Components: Gelly > Reporter: Vasia Kalavri > Assignee: Shivani Ghatge > Priority: Minor > Labels: easyfix, newbie > > Add methods to create Vertex and Edge Datasets directly from CSV file inputs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)