Repository: flink Updated Branches: refs/heads/master bbd97354b -> 9f7110748
[FLINK-2663] [gelly] Updated Gelly library methods to use generic key types This squashes the following commits: [gelly] Added missing Javadocs to GSA classes [FLINK-2663] [gelly] Updated Gelly library methods to also use generic vertex/edge values where possible This closes #1152 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f711074 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f711074 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f711074 Branch: refs/heads/master Commit: 9f7110748434d2bdea96f16c70f4f0894261ae69 Parents: bbd9735 Author: vasia <va...@apache.org> Authored: Sun Sep 20 21:20:11 2015 +0200 Committer: vasia <va...@apache.org> Committed: Thu Oct 1 21:22:35 2015 +0200 ---------------------------------------------------------------------- .../graph/example/ConnectedComponents.java | 2 +- .../flink/graph/example/MusicProfiles.java | 3 +- .../apache/flink/graph/gsa/ApplyFunction.java | 15 +++ .../apache/flink/graph/gsa/GatherFunction.java | 16 +++ .../org/apache/flink/graph/gsa/SumFunction.java | 18 +++- .../flink/graph/library/CommunityDetection.java | 53 ++++++---- .../graph/library/ConnectedComponents.java | 32 +++--- .../graph/library/GSAConnectedComponents.java | 22 ++-- .../apache/flink/graph/library/GSAPageRank.java | 8 +- .../library/GSASingleSourceShortestPaths.java | 8 +- .../flink/graph/library/GSATriangleCount.java | 102 +++++++++---------- .../flink/graph/library/LabelPropagation.java | 12 ++- .../apache/flink/graph/library/PageRank.java | 8 +- .../library/SingleSourceShortestPaths.java | 8 +- .../flink/graph/utils/NullValueEdgeMapper.java | 32 ++++++ .../apache/flink/graph/gsa/GSACompilerTest.java | 1 - .../flink/graph/gsa/GSATranslationTest.java | 1 - .../flink/graph/test/GatherSumApplyITCase.java | 8 +- .../test/library/CommunityDetectionITCase.java | 4 +- ...ctedComponentsWithRandomisedEdgesITCase.java | 3 +- .../test/library/LabelPropagationITCase.java | 8 +- .../graph/test/library/PageRankITCase.java | 8 +- .../graph/test/library/TriangleCountITCase.java | 6 +- .../test/operations/GraphOperationsITCase.java | 1 - 24 files changed, 242 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java index bd08190..4189602 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java @@ -69,7 +69,7 @@ public class ConnectedComponents implements ProgramDescription { }, env); DataSet<Vertex<Long, Long>> verticesWithMinIds = graph - .run(new GSAConnectedComponents(maxIterations)).getVertices(); + .run(new GSAConnectedComponents<Long, NullValue>(maxIterations)); // emit result if (fileOutput) { http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java index a56224d..e347bc5 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java @@ -153,8 +153,7 @@ public class MusicProfiles implements ProgramDescription { public Long map(Tuple2<Long, Long> value) { return value.f1; } - }).run(new LabelPropagation<String>(maxIterations)) - .getVertices(); + }).run(new LabelPropagation<String, NullValue>(maxIterations)); if (fileOutput) { verticesWithCommunity.writeAsCsv(communitiesOutputPath, "\n", "\t"); http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java index ed0cf70..5a8e97a 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java @@ -27,6 +27,13 @@ import org.apache.flink.util.Collector; import java.io.Serializable; import java.util.Collection; +/** + * The base class for the third and last step of a {@link GatherSumApplyIteration}. + * + * @param <K> the vertex ID type + * @param <VV> the vertex value type + * @param <M> the input type (produced by the Sum phase) + */ @SuppressWarnings("serial") public abstract class ApplyFunction<K, VV, M> implements Serializable { @@ -51,6 +58,14 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable { //--------------------------------------------------------------------------------------------- + /** + * This method is invoked once per superstep, after the {@link SumFunction} + * in a {@link GatherSumApplyIteration}. + * It updates the Vertex values. + * + * @param newValue the value computed during the current superstep. + * @param currentValue the current Vertex value. + */ public abstract void apply(M newValue, VV currentValue); /** http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java index 5a09a5a..563b20e 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java @@ -25,6 +25,13 @@ import org.apache.flink.types.Value; import java.io.Serializable; import java.util.Collection; +/** + * The base class for the first step of a {@link GatherSumApplyIteration}. + * + * @param <VV> the vertex value type + * @param <EV> the edge value type + * @param <M> the output type + */ @SuppressWarnings("serial") public abstract class GatherFunction<VV, EV, M> implements Serializable { @@ -49,6 +56,15 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable { //--------------------------------------------------------------------------------------------- + /** + * This method is invoked once per superstep, for each {@link Neighbor} of each Vertex + * in the beginning of each superstep in a {@link GatherSumApplyIteration}. + * It needs to produce a partial value, which will be combined with other partial value + * in the next phase of the iteration. + * + * @param neighbor the input Neighbor. It provides access to the source Vertex and the Edge objects. + * @return a partial result to be combined in the Sum phase. + */ public abstract M gather(Neighbor<VV, EV> neighbor); /** http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java index 69baae4..f27e275 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java @@ -25,6 +25,13 @@ import org.apache.flink.types.Value; import java.io.Serializable; import java.util.Collection; +/** + * The base class for the second step of a {@link GatherSumApplyIteration}. + * + * @param <VV> the vertex value type + * @param <EV> the edge value type + * @param <M> the output type + */ @SuppressWarnings("serial") public abstract class SumFunction<VV, EV, M> implements Serializable { @@ -48,7 +55,16 @@ public abstract class SumFunction<VV, EV, M> implements Serializable { } //--------------------------------------------------------------------------------------------- - + /** + * This method is invoked once per superstep, after the {@link GatherFunction} + * in a {@link GatherSumApplyIteration}. + * It combines the partial values produced by {@link GatherFunction#gather(Neighbor)} + * in pairs, until a single value has been computed. + * + * @param arg0 the first partial value. + * @param arg1 the second partial value. + * @return the combined value. + */ public abstract M sum(M arg0, M arg1); /** http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java index 31488ee..0dd39fc 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java @@ -19,6 +19,8 @@ package org.apache.flink.graph.library; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; @@ -34,18 +36,21 @@ import java.util.TreeMap; /** * Community Detection Algorithm. * - * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value. + * This implementation expects Long Vertex values and labels. The Vertex values of the input Graph provide the initial label assignments. + * + * Initially, each vertex is assigned a tuple formed of its own initial value along with a score equal to 1.0. * The vertices propagate their labels and max scores in iterations, each time adopting the label with the * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value. * * The algorithm converges when vertices no longer update their value or when the maximum number of iterations * is reached. + * + * @param <K> the Vertex ID type * * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a> */ -public class CommunityDetection implements - GraphAlgorithm<Long, Long, Double, Graph<Long, Long, Double>> { +public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Graph<K, Long, Double>> { private Integer maxIterations; @@ -58,20 +63,22 @@ public class CommunityDetection implements } @Override - public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) { + public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) { - Graph<Long, Long, Double> undirectedGraph = graph.getUndirected(); + DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices() + .map(new AddScoreToVertexValuesMapper<K>()); - Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph - .mapVertices(new AddScoreToVertexValuesMapper()); + Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices = + Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected(); - return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta), - new LabelMessenger(), maxIterations) - .mapVertices(new RemoveScoreFromVertexValuesMapper()); + return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater<K>(delta), + new LabelMessenger<K>(), maxIterations) + .mapVertices(new RemoveScoreFromVertexValuesMapper<K>()); } @SuppressWarnings("serial") - public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> { + public static final class VertexLabelUpdater<K> extends VertexUpdateFunction< + K, Tuple2<Long, Double>, Tuple2<Long, Double>> { private Double delta; @@ -80,7 +87,7 @@ public class CommunityDetection implements } @Override - public void updateVertex(Vertex<Long, Tuple2<Long, Double>> vertex, + public void updateVertex(Vertex<K, Tuple2<Long, Double>> vertex, MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception { // we would like these two maps to be ordered @@ -140,34 +147,36 @@ public class CommunityDetection implements } @SuppressWarnings("serial") - public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>, + public static final class LabelMessenger<K> extends MessagingFunction<K, Tuple2<Long, Double>, Tuple2<Long, Double>, Double> { @Override - public void sendMessages(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception { + public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception { - for(Edge<Long, Double> edge : getEdges()) { + for(Edge<K, Double> edge : getEdges()) { sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0, vertex.getValue().f1 * edge.getValue())); } - } } @SuppressWarnings("serial") - public static final class AddScoreToVertexValuesMapper implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> { + @ForwardedFields("f0") + public static final class AddScoreToVertexValuesMapper<K> implements MapFunction< + Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> { - @Override - public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) throws Exception { - return new Tuple2<Long, Double>(vertex.getValue(), 1.0); + public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> vertex) { + return new Vertex<K, Tuple2<Long, Double>>( + vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0)); } } @SuppressWarnings("serial") - public static final class RemoveScoreFromVertexValuesMapper implements MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> { + public static final class RemoveScoreFromVertexValuesMapper<K> implements MapFunction< + Vertex<K, Tuple2<Long, Double>>, Long> { @Override - public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception { + public Long map(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception { return vertex.getValue().f0; } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java index 871f315..ed853fe 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java @@ -18,27 +18,32 @@ package org.apache.flink.graph.library; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.NullValueEdgeMapper; import org.apache.flink.types.NullValue; /** - * A vertex-centric implementation of the Connected components algorithm. + * A vertex-centric implementation of the Connected Components algorithm. * - * Initially, each vertex will have its own ID as a value(is its own component). The vertices propagate their - * current component ID in iterations, each time adopting a new value from the received neighbor IDs, + * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs. + * The vertices propagate their current component ID in iterations, each time adopting a new value from the received neighbor IDs, * provided that the value is less than the current minimum. * * The algorithm converges when vertices no longer update their value or when the maximum number of iterations * is reached. + * + * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID. + * + * @see {@link org.apache.flink.graph.library.GSAConnectedComponents} */ @SuppressWarnings("serial") -public class ConnectedComponents implements - GraphAlgorithm<Long, Long, NullValue, Graph<Long, Long, NullValue>> { +public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> { private Integer maxIterations; @@ -47,21 +52,24 @@ public class ConnectedComponents implements } @Override - public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception { + public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception { - Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected(); + Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>()) + .getUndirected(); // initialize vertex values and run the Vertex Centric Iteration - return undirectedGraph.runVertexCentricIteration(new CCUpdater(), new CCMessenger(), maxIterations); + return undirectedGraph.runVertexCentricIteration( + new CCUpdater<K>(), new CCMessenger<K>(), maxIterations) + .getVertices(); } /** * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages. */ - public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> { + public static final class CCUpdater<K> extends VertexUpdateFunction<K, Long, Long> { @Override - public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> messages) throws Exception { + public void updateVertex(Vertex<K, Long> vertex, MessageIterator<Long> messages) throws Exception { long min = Long.MAX_VALUE; for (long msg : messages) { @@ -78,10 +86,10 @@ public class ConnectedComponents implements /** * Distributes the minimum ID associated with a given vertex among all the target vertices. */ - public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> { + public static final class CCMessenger<K> extends MessagingFunction<K, Long, Long, NullValue> { @Override - public void sendMessages(Vertex<Long, Long> vertex) throws Exception { + public void sendMessages(Vertex<K, Long> vertex) throws Exception { // send current minimum to neighbors sendMessageToAllNeighbors(vertex.getValue()); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java index f852f3e..77bc2cf 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java @@ -18,19 +18,25 @@ package org.apache.flink.graph.library; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.gsa.Neighbor; +import org.apache.flink.graph.utils.NullValueEdgeMapper; import org.apache.flink.types.NullValue; /** * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration. + * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs. + * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID. + * + * @see {@link org.apache.flink.graph.library.ConnectedComponents} */ -public class GSAConnectedComponents implements - GraphAlgorithm<Long, Long, NullValue, Graph<Long, Long, NullValue>> { +public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> { private Integer maxIterations; @@ -39,13 +45,15 @@ public class GSAConnectedComponents implements } @Override - public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception { + public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception { - Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected(); + Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>()) + .getUndirected(); // initialize vertex values and run the Vertex Centric Iteration - return undirectedGraph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId(), - maxIterations); + return undirectedGraph.runGatherSumApplyIteration( + new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId<K>(), + maxIterations).getVertices(); } // -------------------------------------------------------------------------------------------- @@ -69,7 +77,7 @@ public class GSAConnectedComponents implements }; @SuppressWarnings("serial") - private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> { + private static final class UpdateComponentId<K> extends ApplyFunction<K, Long, Long> { public void apply(Long summedValue, Long origValue) { if (summedValue < origValue) { http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java index 6ce2ed6..df3e89a 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.Neighbor; @@ -36,7 +37,7 @@ import org.apache.flink.graph.gsa.SumFunction; * * The implementation assumes that each page has at least one incoming and one outgoing link. */ -public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> { +public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> { private double beta; private int maxIterations; @@ -58,7 +59,7 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, Graph<K } @Override - public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception { + public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception { if (numberOfVertices == 0) { numberOfVertices = network.numberOfVertices(); @@ -70,7 +71,8 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, Graph<K .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper()); return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(), - new UpdateRanks<K>(beta, numberOfVertices), maxIterations); + new UpdateRanks<K>(beta, numberOfVertices), maxIterations) + .getVertices(); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java index 18bdd1d..5a76072 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.library; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; @@ -31,7 +32,7 @@ import org.apache.flink.graph.gsa.Neighbor; * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration */ public class GSASingleSourceShortestPaths<K> implements - GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> { + GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> { private final K srcVertexId; private final Integer maxIterations; @@ -42,11 +43,12 @@ public class GSASingleSourceShortestPaths<K> implements } @Override - public Graph<K, Double, Double> run(Graph<K, Double, Double> input) { + public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) { return input.mapVertices(new InitVerticesMapper<K>(srcVertexId)) .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(), - new UpdateDistance<K>(), maxIterations); + new UpdateDistance<K>(), maxIterations) + .getVertices(); } @SuppressWarnings("serial") http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java index 3d4d902..76d170d 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.ReduceNeighborsFunction; @@ -46,62 +45,61 @@ import java.util.TreeMap; * * This implementation is non - iterative. * - * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet of - * Tuple1 which contains a single integer representing the number of triangles. + * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet + * which contains a single integer representing the number of triangles. */ -public class GSATriangleCount implements - GraphAlgorithm<Long, NullValue, NullValue, DataSet<Tuple1<Integer>>> { +public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements + GraphAlgorithm<K, VV, EV, DataSet<Integer>> { @SuppressWarnings("serial") @Override - public DataSet<Tuple1<Integer>> run(Graph<Long, NullValue, NullValue> input) throws Exception { + public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception { ExecutionEnvironment env = input.getContext(); // order the edges so that src is always higher than trg - DataSet<Edge<Long, NullValue>> edges = input.getEdges() - .map(new OrderEdges()).distinct(); + DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new OrderEdges<K, EV>()).distinct(); - Graph<Long, TreeMap<Long, Integer>, NullValue> graph = Graph.fromDataSet(edges, - new VertexInitializer(), env); + Graph<K, TreeMap<K, Integer>, NullValue> graph = Graph.fromDataSet(edges, + new VertexInitializer<K>(), env); // select neighbors with ids higher than the current vertex id // Gather: a no-op in this case // Sum: create the set of neighbors - DataSet<Tuple2<Long, TreeMap<Long, Integer>>> higherIdNeighbors = - graph.reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN); + DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors = + graph.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN); - Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithReinitializedVertexValues = - graph.mapVertices(new VertexInitializerEmptyTreeMap()); + Graph<K, TreeMap<K, Integer>, NullValue> graphWithReinitializedVertexValues = + graph.mapVertices(new VertexInitializerEmptyTreeMap<K>()); // Apply: attach the computed values to the vertices // joinWithVertices to update the node values - DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithHigherIdNeighbors = - graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues()).getVertices(); + DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithHigherIdNeighbors = + graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues<K>()).getVertices(); - Graph<Long, TreeMap<Long,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors, + Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors, edges, env); // propagate each received value to neighbors with higher id // Gather: a no-op in this case // Sum: propagate values - DataSet<Tuple2<Long, TreeMap<Long, Integer>>> propagatedValues = graphWithNeighbors - .reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN); + DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = graphWithNeighbors + .reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN); // Apply: attach propagated values to vertices - DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithPropagatedValues = - graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues()).getVertices(); + DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithPropagatedValues = + graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues<K>()).getVertices(); - Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithPropagatedNeighbors = + Graph<K, TreeMap<K, Integer>, NullValue> graphWithPropagatedNeighbors = Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env); // Scatter: compute the number of triangles - DataSet<Tuple1<Integer>> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets() - .map(new ComputeTriangles()).reduce(new ReduceFunction<Tuple1<Integer>>() { + DataSet<Integer> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets() + .map(new ComputeTriangles<K>()).reduce(new ReduceFunction<Integer>() { @Override - public Tuple1<Integer> reduce(Tuple1<Integer> firstTuple, Tuple1<Integer> secondTuple) throws Exception { - return new Tuple1<Integer>(firstTuple.f0 + secondTuple.f0); + public Integer reduce(Integer first, Integer second) throws Exception { + return first + second; } }); @@ -109,24 +107,25 @@ public class GSATriangleCount implements } @SuppressWarnings("serial") - private static final class OrderEdges implements MapFunction<Edge<Long, NullValue>, Edge<Long, NullValue>> { + private static final class OrderEdges<K extends Comparable<K>, EV> implements + MapFunction<Edge<K, EV>, Edge<K, NullValue>> { @Override - public Edge<Long, NullValue> map(Edge<Long, NullValue> edge) throws Exception { - if (edge.getSource() < edge.getTarget()) { - return new Edge<Long, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance()); + public Edge<K, NullValue> map(Edge<K, EV> edge) throws Exception { + if (edge.getSource().compareTo(edge.getTarget()) < 0) { + return new Edge<K, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance()); } else { - return edge; + return new Edge<K, NullValue>(edge.getSource(), edge.getTarget(), NullValue.getInstance()); } } } @SuppressWarnings("serial") - private static final class VertexInitializer implements MapFunction<Long, TreeMap<Long, Integer>> { + private static final class VertexInitializer<K> implements MapFunction<K, TreeMap<K, Integer>> { @Override - public TreeMap<Long, Integer> map(Long value) throws Exception { - TreeMap<Long, Integer> neighbors = new TreeMap<Long, Integer>(); + public TreeMap<K, Integer> map(K value) throws Exception { + TreeMap<K, Integer> neighbors = new TreeMap<K, Integer>(); neighbors.put(value, 1); return neighbors; @@ -134,31 +133,32 @@ public class GSATriangleCount implements } @SuppressWarnings("serial") - private static final class VertexInitializerEmptyTreeMap implements - MapFunction<Vertex<Long, TreeMap<Long, Integer>>, TreeMap<Long, Integer>> { + private static final class VertexInitializerEmptyTreeMap<K> implements + MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, Integer>> { @Override - public TreeMap<Long, Integer> map(Vertex<Long, TreeMap<Long, Integer>> vertex) throws Exception { - return new TreeMap<Long, Integer>(); + public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> vertex) throws Exception { + return new TreeMap<K, Integer>(); } } @SuppressWarnings("serial") - private static final class AttachValues implements MapFunction<Tuple2<TreeMap<Long, Integer>, - TreeMap<Long, Integer>>, TreeMap<Long, Integer>> { + private static final class AttachValues<K> implements MapFunction<Tuple2<TreeMap<K, Integer>, + TreeMap<K, Integer>>, TreeMap<K, Integer>> { @Override - public TreeMap<Long, Integer> map(Tuple2<TreeMap<Long, Integer>, TreeMap<Long, Integer>> tuple2) throws Exception { + public TreeMap<K, Integer> map(Tuple2<TreeMap<K, Integer>, TreeMap<K, Integer>> tuple2) throws Exception { return tuple2.f1; } } @SuppressWarnings("serial") - private static final class GatherHigherIdNeighbors implements ReduceNeighborsFunction<TreeMap<Long,Integer>> { + private static final class GatherHigherIdNeighbors<K> implements + ReduceNeighborsFunction<TreeMap<K,Integer>> { @Override - public TreeMap<Long,Integer> reduceNeighbors(TreeMap<Long,Integer> first, TreeMap<Long,Integer> second) { - for (Long key : second.keySet()) { + public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> first, TreeMap<K,Integer> second) { + for (K key : second.keySet()) { Integer value = first.get(key); if (value != null) { first.put(key, value + second.get(key)); @@ -171,20 +171,20 @@ public class GSATriangleCount implements } @SuppressWarnings("serial") - private static final class ComputeTriangles implements MapFunction<Triplet<Long, TreeMap<Long, Integer>, NullValue>, - Tuple1<Integer>> { + private static final class ComputeTriangles<K> implements MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>, + Integer> { @Override - public Tuple1<Integer> map(Triplet<Long, TreeMap<Long, Integer>, NullValue> triplet) throws Exception { + public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> triplet) throws Exception { - Vertex<Long, TreeMap<Long, Integer>> srcVertex = triplet.getSrcVertex(); - Vertex<Long, TreeMap<Long, Integer>> trgVertex = triplet.getTrgVertex(); + Vertex<K, TreeMap<K, Integer>> srcVertex = triplet.getSrcVertex(); + Vertex<K, TreeMap<K, Integer>> trgVertex = triplet.getTrgVertex(); int triangles = 0; if(trgVertex.getValue().get(srcVertex.getId()) != null) { - triangles=trgVertex.getValue().get(srcVertex.getId()); + triangles = trgVertex.getValue().get(srcVertex.getId()); } - return new Tuple1<Integer>(triangles); + return triangles; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java index 2dd2180..82dfee7 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -18,12 +18,14 @@ package org.apache.flink.graph.library; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.NullValueEdgeMapper; import org.apache.flink.types.NullValue; import java.util.HashMap; @@ -42,8 +44,8 @@ import java.util.Map.Entry; */ @SuppressWarnings("serial") -public class LabelPropagation<K extends Comparable<K>> implements - GraphAlgorithm<K, Long, NullValue, Graph<K, Long, NullValue>> { +public class LabelPropagation<K extends Comparable<K>, EV> implements GraphAlgorithm<K, Long, EV, + DataSet<Vertex<K, Long>>> { private final int maxIterations; @@ -52,12 +54,12 @@ public class LabelPropagation<K extends Comparable<K>> implements } @Override - public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) { + public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> input) { // iteratively adopt the most frequent label among the neighbors // of each vertex - return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(), - maxIterations); + return input.mapEdges(new NullValueEdgeMapper<K, EV>()).runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(), + maxIterations).getVertices(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java index 47f7acd..8193dba 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java @@ -37,8 +37,7 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction; * * The implementation assumes that each page has at least one incoming and one outgoing link. */ -public class PageRank<K> implements - GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> { +public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> { private double beta; private int maxIterations; @@ -66,7 +65,7 @@ public class PageRank<K> implements } @Override - public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception { + public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception { if (numberOfVertices == 0) { numberOfVertices = network.numberOfVertices(); @@ -78,7 +77,8 @@ public class PageRank<K> implements .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper()); return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices), - new RankMessenger<K>(numberOfVertices), maxIterations); + new RankMessenger<K>(numberOfVertices), maxIterations) + .getVertices(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 1911f73..60c4c17 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.library; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; @@ -31,8 +32,7 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction; * This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration. */ @SuppressWarnings("serial") -public class SingleSourceShortestPaths<K> implements - GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> { +public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> { private final K srcVertexId; private final Integer maxIterations; @@ -43,11 +43,11 @@ public class SingleSourceShortestPaths<K> implements } @Override - public Graph<K, Double, Double> run(Graph<K, Double, Double> input) { + public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) { return input.mapVertices(new InitVerticesMapper<K>(srcVertexId)) .runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(), - maxIterations); + maxIterations).getVertices(); } public static final class InitVerticesMapper<K> implements MapFunction<Vertex<K, Double>, Double> { http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java new file mode 100644 index 0000000..2bd4719 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.graph.Edge; +import org.apache.flink.types.NullValue; + +public class NullValueEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, NullValue> { + + private static final long serialVersionUID = 1L; + + public NullValue map(Edge<K, EV> edge) { + return NullValue.getInstance(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java index 7a66639..2ad203f 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java @@ -56,7 +56,6 @@ public class GSACompilerTest extends CompilerTestBase { env.setParallelism(DEFAULT_PARALLELISM); // compose test program { - @SuppressWarnings("unchecked") DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>( 1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>()); http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java index 0a7b1c7..ced7508 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java @@ -71,7 +71,6 @@ public class GSATranslationTest { // ------------ construct the test program ------------------ { - @SuppressWarnings("unchecked") DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>( 1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>()); http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java index 4b381b6..0213f02 100755 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java @@ -55,8 +55,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase { ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env), new InitMapperCC(), env); - List<Vertex<Long, Long>> result = inputGraph.run(new GSAConnectedComponents(16)) - .getVertices().collect(); + List<Vertex<Long, Long>> result = inputGraph.run( + new GSAConnectedComponents<Long, NullValue>(16)).collect(); expectedResult = "1,1\n" + "2,1\n" + @@ -78,8 +78,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase { SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), new InitMapperSSSP(), env); - List<Vertex<Long, Double>> result = inputGraph.run(new GSASingleSourceShortestPaths<Long>(1l, 16)) - .getVertices().collect(); + List<Vertex<Long, Double>> result = inputGraph.run( + new GSASingleSourceShortestPaths<Long>(1l, 16)).collect(); expectedResult = "1,0.0\n" + "2,12.0\n" + http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java index 104996e..421eaa9 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java @@ -50,7 +50,7 @@ public class CommunityDetectionITCase extends MultipleProgramsTestBase { Graph<Long, Long, Double> inputGraph = Graph.fromDataSet( CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env); - List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA)) + List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA)) .getVertices().collect(); expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION; @@ -66,7 +66,7 @@ public class CommunityDetectionITCase extends MultipleProgramsTestBase { Graph<Long, Long, Double> inputGraph = Graph.fromDataSet( CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env); - List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA)) + List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA)) .getVertices().collect(); expected = CommunityDetectionData.COMMUNITIES_WITH_TIE; compareResultAsTuples(result, expected); http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java index ef4b467..9eb7387 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java @@ -60,8 +60,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env); - DataSet<Vertex<Long, Long>> result = graph - .run(new ConnectedComponents(100)).getVertices(); + DataSet<Vertex<Long, Long>> result = graph.run(new ConnectedComponents<Long, NullValue>(100)); result.writeAsCsv(resultPath, "\n", " "); env.execute(); http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java index da36ef6..8785b0d 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java @@ -51,8 +51,8 @@ public class LabelPropagationITCase extends MultipleProgramsTestBase { LabelPropagationData.getDefaultVertexSet(env), LabelPropagationData.getDefaultEdgeDataSet(env), env); - List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1)) - .getVertices().collect(); + List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long, NullValue>(1)) + .collect(); expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION; compareResultAsTuples(result, expectedResult); @@ -69,8 +69,8 @@ public class LabelPropagationITCase extends MultipleProgramsTestBase { LabelPropagationData.getTieVertexSet(env), LabelPropagationData.getTieEdgeDataSet(env), env); - List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1)) - .getVertices().collect(); + List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long, NullValue>(1)) + .collect(); expectedResult = LabelPropagationData.LABELS_WITH_TIE; compareResultAsTuples(result, expectedResult); http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java index cc0327f..94c7713 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java @@ -51,7 +51,7 @@ public class PageRankITCase extends MultipleProgramsTestBase { PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3)) - .getVertices().collect(); + .collect(); compareWithDelta(result, expectedResult, 0.01); } @@ -64,7 +64,7 @@ public class PageRankITCase extends MultipleProgramsTestBase { PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3)) - .getVertices().collect(); + .collect(); compareWithDelta(result, expectedResult, 0.01); } @@ -77,7 +77,7 @@ public class PageRankITCase extends MultipleProgramsTestBase { PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3)) - .getVertices().collect(); + .collect(); compareWithDelta(result, expectedResult, 0.01); } @@ -90,7 +90,7 @@ public class PageRankITCase extends MultipleProgramsTestBase { PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3)) - .getVertices().collect(); + .collect(); compareWithDelta(result, expectedResult, 0.01); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java index 047bbf7..1d9ab9f 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java @@ -19,12 +19,12 @@ package org.apache.flink.graph.test.library; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.graph.Graph; import org.apache.flink.graph.example.utils.TriangleCountData; import org.apache.flink.graph.library.GSATriangleCount; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.NullValue; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -48,9 +48,9 @@ public class TriangleCountITCase extends MultipleProgramsTestBase { Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env), env).getUndirected(); - List<Tuple1<Integer>> numberOfTriangles = graph.run(new GSATriangleCount()).collect(); + List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect(); expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES; - compareResultAsTuples(numberOfTriangles, expectedResult); + Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java index a06f881..ffc9da9 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java @@ -326,7 +326,6 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase { compareResultAsTuples(result, expectedResult); } - @SuppressWarnings("unchecked") @Test public void testDifference2() throws Exception { /*