Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1004#discussion_r37273914
  
    --- Diff: 
flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
 ---
    @@ -0,0 +1,735 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.scala
    +
    +import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.{tuple => jtuple}
    +import org.apache.flink.api.scala._
    +import org.apache.flink.graph._
    +import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, 
GatherFunction, SumFunction}
    +import org.apache.flink.graph.spargel.{MessagingFunction, 
VertexCentricConfiguration, VertexUpdateFunction}
    +import org.apache.flink.{graph => jg}
    +
    +import _root_.scala.collection.JavaConverters._
    +import _root_.scala.reflect.ClassTag
    +
    +object Graph {
    +  def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
    +  TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: 
DataSet[Edge[K, EV]],
    +                              env: ExecutionEnvironment): Graph[K, VV, EV] 
= {
    +    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, 
edges.javaSet, env.getJavaEnv))
    +  }
    +
    +  def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
    +  TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: 
Seq[Edge[K, EV]], env:
    +  ExecutionEnvironment): Graph[K, VV, EV] = {
    +    wrapGraph(jg.Graph.fromCollection[K, VV, 
EV](vertices.asJavaCollection, edges
    +      .asJavaCollection, env.getJavaEnv))
    +  }
    +}
    +
    +/**
    + * Represents a graph consisting of {@link Edge edges} and {@link Vertex 
vertices}.
    + * @param jgraph the underlying java api Graph.
    + * @tparam K the key type for vertex and edge identifiers
    + * @tparam VV the value type for vertices
    + * @tparam EV the value type for edges
    + * @see org.apache.flink.graph.Edge
    + * @see org.apache.flink.graph.Vertex
    + */
    +final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
    +TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    +
    +  private[flink] def getWrappedGraph = jgraph
    +
    +
    +  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = 
true): F = {
    +    if (jgraph.getContext.getConfig.isClosureCleanerEnabled) {
    +      ClosureCleaner.clean(f, checkSerializable)
    +    }
    +    ClosureCleaner.ensureSerializable(f)
    +    f
    +  }
    +
    +  /**
    +   * @return the vertex DataSet.
    +   */
    +  def getVertices = wrap(jgraph.getVertices)
    +
    +  /**
    +   * @return the edge DataSet.
    +   */
    +  def getEdges = wrap(jgraph.getEdges)
    +
    +  /**
    +   * @return the vertex DataSet as Tuple2.
    +   */
    +  def getVerticesAsTuple2(): DataSet[(K, VV)] = {
    +    wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1))
    +  }
    +
    +  /**
    +   * @return the edge DataSet as Tuple3.
    +   */
    +  def getEdgesAsTuple3(): DataSet[(K, K, EV)] = {
    +    wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, 
jtuple.f2))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each vertex in the graph.
    +   *
    +   * @param mapper the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapVertices[NV: TypeInformation : ClassTag](mapper: 
MapFunction[Vertex[K, VV], NV]):
    +  Graph[K, NV, EV] = {
    +    new Graph[K, NV, EV](jgraph.mapVertices[NV](
    +      mapper,
    +      createTypeInformation[Vertex[K, NV]]
    +    ))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each vertex in the graph.
    +   *
    +   * @param fun the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => 
NV): Graph[K, NV, EV] = {
    +    val mapper: MapFunction[Vertex[K, VV], NV] = new MapFunction[Vertex[K, 
VV], NV] {
    +      val cleanFun = clean(fun)
    +
    +      def map(in: Vertex[K, VV]): NV = cleanFun(in)
    +    }
    +    new Graph[K, NV, EV](jgraph.mapVertices[NV](mapper, 
createTypeInformation[Vertex[K, NV]]))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each edge in the graph.
    +   *
    +   * @param mapper the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, 
EV], NV]): Graph[K,
    +    VV, NV] = {
    +    new Graph[K, VV, NV](jgraph.mapEdges[NV](
    +      mapper,
    +      createTypeInformation[Edge[K, NV]]
    +    ))
    +  }
    +
    +  /**
    +   * Apply a function to the attribute of each edge in the graph.
    +   *
    +   * @param fun the map function to apply.
    +   * @return a new graph
    +   */
    +  def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): 
Graph[K, VV, NV] = {
    +    val mapper: MapFunction[Edge[K, EV], NV] = new MapFunction[Edge[K, 
EV], NV] {
    +      val cleanFun = clean(fun)
    +
    +      def map(in: Edge[K, EV]): NV = cleanFun(in)
    +    }
    +    new Graph[K, VV, NV](jgraph.mapEdges[NV](mapper, 
createTypeInformation[Edge[K, NV]]))
    +  }
    +
    +  /**
    +   * Joins the vertex DataSet of this graph with an input DataSet and 
applies
    +   * a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @return a new graph where the vertex values have been updated.
    +   */
    +  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], 
mapper: MapFunction[
    +    (VV, T), VV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
    +      override def map(value: jtuple.Tuple2[VV, T]): VV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the vertex DataSet of this graph with an input DataSet and 
applies
    +   * a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @return a new graph where the vertex values have been updated.
    +   */
    +  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], 
fun: (VV, T) => VV):
    +  Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[VV, T]): VV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on a composite key of 
both
    +   * source and target and applies a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], 
mapper: MapFunction[
    +    (EV, T), EV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple3(scalatuple._1,
    +      scalatuple._2, scalatuple._3)).javaSet
    +    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on a composite key of 
both
    +   * source and target and applies a UDF on the resulted values.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], 
fun: (EV, T) => EV):
    +  Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple3(scalatuple._1,
    +      scalatuple._2, scalatuple._3)).javaSet
    +    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the source key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF 
on
    +   * the resulted values. In case the inputDataSet contains the same key 
more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, 
T)], mapper:
    +  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the source key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF 
on
    +   * the resulted values. In case the inputDataSet contains the same key 
more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, 
T)], fun: (EV, T) =>
    +    EV): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the target key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF 
on
    +   * the resulted values. Should the inputDataSet contain the same key more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param mapper the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, 
T)], mapper:
    +  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        mapper.map((value.f0, value.f1))
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Joins the edge DataSet with an input DataSet on the target key of the
    +   * edges and the first attribute of the input DataSet and applies a UDF 
on
    +   * the resulted values. Should the inputDataSet contain the same key more
    +   * than once, only the first value will be considered.
    +   *
    +   * @param inputDataSet the DataSet to join with.
    +   * @param fun the UDF map function to apply.
    +   * @tparam T the return type
    +   * @return a new graph where the edge values have been updated.
    +   */
    +  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, 
T)], fun: (EV, T) =>
    +    EV): Graph[K, VV, EV] = {
    +    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
    +      val cleanFun = clean(fun)
    +
    +      override def map(value: jtuple.Tuple2[EV, T]): EV = {
    +        cleanFun(value.f0, value.f1)
    +      }
    +    }
    +    val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
    +      scalatuple._2)).javaSet
    +    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
    +  }
    +
    +  /**
    +   * Apply filtering functions to the graph and return a sub-graph that
    +   * satisfies the predicates for both vertices and edges.
    +   *
    +   * @param vertexFilter the filter function for vertices.
    +   * @param edgeFilter the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: 
FilterFunction[Edge[K,
    +    EV]]) = {
    +    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
    +  }
    +
    +  /**
    +   * Apply filtering functions to the graph and return a sub-graph that
    +   * satisfies the predicates for both vertices and edges.
    +   *
    +   * @param vertexFilterFun the filter function for vertices.
    +   * @param edgeFilterFun the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean, edgeFilterFun: 
Edge[K, EV] =>
    +    Boolean) = {
    +    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
    +      val cleanVertexFun = clean(vertexFilterFun)
    +
    +      override def filter(value: Vertex[K, VV]): Boolean = 
cleanVertexFun(value)
    +    }
    +
    +    val edgeFilter = new FilterFunction[Edge[K, EV]] {
    +      val cleanEdgeFun = clean(edgeFilterFun)
    +
    +      override def filter(value: Edge[K, EV]): Boolean = 
cleanEdgeFun(value)
    +    }
    +
    +    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the vertices.
    +   *
    +   * @param vertexFilter the filter function for vertices.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]) = {
    +    wrapGraph(jgraph.filterOnVertices(vertexFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the vertices.
    +   *
    +   * @param vertexFilterFun the filter function for vertices.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean) = {
    +    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
    +      val cleanVertexFun = clean(vertexFilterFun)
    +
    +      override def filter(value: Vertex[K, VV]): Boolean = 
cleanVertexFun(value)
    +    }
    +
    +    wrapGraph(jgraph.filterOnVertices(vertexFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the edges.
    +   *
    +   * @param edgeFilter the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]) = {
    +    wrapGraph(jgraph.filterOnEdges(edgeFilter))
    +  }
    +
    +  /**
    +   * Apply a filtering function to the graph and return a sub-graph that
    +   * satisfies the predicates only for the edges.
    +   *
    +   * @param edgeFilterFun the filter function for edges.
    +   * @return the resulting sub-graph.
    +   */
    +  def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean) = {
    +    val edgeFilter = new FilterFunction[Edge[K, EV]] {
    +      val cleanEdgeFun = clean(edgeFilterFun)
    +
    +      override def filter(value: Edge[K, EV]): Boolean = 
cleanEdgeFun(value)
    +    }
    +
    +    //wrapGraph(jgraph.filterOnEdges(edgeFilter))
    --- End diff --
    
    Maybe unnecessary comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to