Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81226805 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair + * of top vertices. + * + * <p>Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods. + * + * @param <KT> the key type of the top vertices + * @param <KB> the key type of the bottom vertices + * @param <VT> the top vertices value type + * @param <VB> the bottom vertices value type + * @param <EV> the edge value type + */ +public class BipartiteGraph<KT, KB, VT, VB, EV> { + private final ExecutionEnvironment context; + private final DataSet<Vertex<KT, VT>> topVertices; + private final DataSet<Vertex<KB, VB>> bottomVertices; + private final DataSet<BipartiteEdge<KT, KB, EV>> edges; + + private BipartiteGraph( + DataSet<Vertex<KT, VT>> topVertices, + DataSet<Vertex<KB, VB>> bottomVertices, + DataSet<BipartiteEdge<KT, KB, EV>> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** + * Create bipartite graph from datasets. + * + * @param topVertices dataset of top vertices in the graph + * @param bottomVertices dataset of bottom vertices in the graph + * @param edges dataset of edges between vertices + * @param context Flink execution context + * @param <KT> the key type of the top vertices + * @param <KB> the key type of the bottom vertices + * @param <VT> the top vertices value type + * @param <VB> the bottom vertices value type + * @param <EV> the edge value type + * @return new bipartite graph created from provided datasets + */ + public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet( + DataSet<Vertex<KT, VT>> topVertices, + DataSet<Vertex<KB, VB>> bottomVertices, + DataSet<BipartiteEdge<KT, KB, EV>> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** + * Get dataset with top vertices. + * + * @return dataset with top vertices + */ + public DataSet<Vertex<KT, VT>> getTopVertices() { + return topVertices; + } + + /** + * Get dataset with bottom vertices. + * + * @return dataset with bottom vertices + */ + public DataSet<Vertex<KB, VB>> getBottomVertices() { + return bottomVertices; + } + + /** + * Get dataset with graph edges. + * + * @return dataset with graph edges + */ + public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() { + return edges; + } + + /** + * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new + * graph will exist only if the original bipartite graph contains a bottom vertex they both connected to. + * + * <p>Caller should provide a function that will create an edge between two top vertices. This function will receive + * a collection of all connections between each pair of top vertices. Note that this function will be called twice for + * each pair of connected vertices, so it's up to a caller if one or two edges should be created. + * + * @param edgeFactory function that will be used to create edges in the new graph + * @param <NEV> the edge value type in the new graph + * @return top projection of the bipartite graph + */ + public <NEV> Graph<KT, VT, NEV> topProjection( + final GroupReduceFunction<Tuple2< + Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, + Vertex<KB, VB>>, + Edge<KT, NEV>> edgeFactory) { + + DataSet<Edge<KT, NEV>> newEdges = edges.join(edges) + .where(new TopProjectionKeySelector<KT, KB, EV>()) --- End diff -- Using the field index should be faster than a key selector, and allows the optimizer to reuse sorted fields.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---