Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2564#discussion_r81779541 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices. + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between + * a pair of top vertices. + * + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * {@link BipartiteGraph#simpleTopProjection()} or + * {@link BipartiteGraph#fullBottomProjection()} methods. + * + * @param <TK> the key type of the top vertices + * @param <BK> the key type of the bottom vertices + * @param <TV> the top vertices value type + * @param <BV> the bottom vertices value type + * @param <EV> the edge value type + */ +public class BipartiteGraph<TK, BK, TV, BV, EV> { + private final ExecutionEnvironment context; + private final DataSet<Vertex<TK, TV>> topVertices; + private final DataSet<Vertex<BK, BV>> bottomVertices; + private final DataSet<BipartiteEdge<TK, BK, EV>> edges; + + private BipartiteGraph( + DataSet<Vertex<TK, TV>> topVertices, + DataSet<Vertex<BK, BV>> bottomVertices, + DataSet<BipartiteEdge<TK, BK, EV>> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** + * Create bipartite graph from datasets. + * + * @param topVertices dataset of top vertices in the graph + * @param bottomVertices dataset of bottom vertices in the graph + * @param edges dataset of edges between vertices + * @param context Flink execution context + * @param <KT> the key type of the top vertices + * @param <KB> the key type of the bottom vertices + * @param <VT> the top vertices value type + * @param <VB> the bottom vertices value type + * @param <EV> the edge value type + * @return new bipartite graph created from provided datasets + */ + public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet( + DataSet<Vertex<KT, VT>> topVertices, + DataSet<Vertex<KB, VB>> bottomVertices, + DataSet<BipartiteEdge<KT, KB, EV>> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** + * Get dataset with top vertices. + * + * @return dataset with top vertices + */ + public DataSet<Vertex<TK, TV>> getTopVertices() { + return topVertices; + } + + /** + * Get dataset with bottom vertices. + * + * @return dataset with bottom vertices + */ + public DataSet<Vertex<BK, BV>> getBottomVertices() { + return bottomVertices; + } + + /** + * Get dataset with graph edges. + * + * @return dataset with graph edges + */ + public DataSet<BipartiteEdge<TK, BK, EV>> getEdges() { + return edges; + } + + /** + * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new + * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to. + * + * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that + * connect top vertices in the original graph + */ + public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() { + + DataSet<Edge<TK, Tuple2<EV, EV>>> newEdges = edges.join(edges) + .where(1) + .equalTo(1) + .filter(new FilterFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>>() { + @Override + public boolean filter(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception { + BipartiteEdge<TK, BK, EV> edge1 = value.f0; + BipartiteEdge<TK, BK, EV> edge2 = value.f1; + return !edge1.getTopId().equals(edge2.getTopId()); + } + }) + .map(new MapFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>, Edge<TK, Tuple2<EV, EV>>>() { + @Override + public Edge<TK, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception { + return new Edge<>( --- End diff -- The `Edge` and nested `Tuple2` can be reused.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---