[ https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501313#comment-14501313 ]
ASF GitHub Bot commented on FLINK-1523: --------------------------------------- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643470 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.IncrementalSSSPData; +import org.apache.flink.graph.spargel.IterationConfiguration; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +/** + * Incremental Single Sink Shortest Paths Example. + * + * The program takes as input the resulted graph after a SSSP computation, + * an edge to be removed and the initial graph(i.e. before SSSP was computed). + * + * - If the removed edge does not belong to the SP-graph, no computation is necessary. + * The edge is simply removed from the graph. + * - If the removed edge is an SP-edge, then all nodes, whose shortest path contains the removed edge, + * potentially require re-computation. + * When the edge <u, v> is removed, v checks if it has another out-going SP-edge. + * If yes, no further computation is required. + * If v has no other out-going SP-edge, it invalidates its current value, by setting it to INF. + * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE message. + * When a vertex u receives an INVALIDATE message from v, it checks whether it has another out-going SP-edge. + * If not, it invalidates its current value and propagates the INVALIDATE message. + * The propagation stops when a vertex with an alternative shortest path is reached + * or when we reach a vertex with no SP-in-neighbors. + * + * Usage <code>IncrementalSSSPExample <vertex path> <edge path> <edges in SSSP> + * <edge to be removed> <result path> <number of iterations></code><br> + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData} + */ +@SuppressWarnings("serial") +public class IncrementalSSSPExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Vertex<Long, Double>> vertices = getVerticesDataSet(env); + + DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); + + DataSet<Edge<Long, Double>> edgesInSSSP = getEdgesinSSSPDataSet(env); + + Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved(); + + Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env); + + // Assumption: all minimum weight paths are kept + Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env); + + // remove the edge + graph.removeEdge(edgeToBeRemoved); + + // configure the iteration + IterationConfiguration parameters = new IterationConfiguration(); + + if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) { + + parameters.setDirection(EdgeDirection.IN); + parameters.setOptDegrees(true); + + // run the vertex centric iteration to propagate info + Graph<Long, Double, Double> result = ssspGraph.runVertexCentricIteration(new VertexDistanceUpdater(), + new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters); + + DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices(); + + // Emit results + if(fileOutput) { + resultedVertices.writeAsCsv(outputPath, "\n", ","); + } else { + resultedVertices.print(); + } + + env.execute("Incremental SSSP Example"); + } else { + // print the vertices + if(fileOutput) { + vertices.writeAsCsv(outputPath, "\n", ","); + } else { + vertices.print(); + } + + env.execute("Incremental SSSP Example"); + } + } + + @Override + public String getDescription() { + return "Incremental Single Sink Shortest Paths Example"; + } + + // ****************************************************************************************************************** + // IncrementalSSSP METHODS + // ****************************************************************************************************************** + + /** + * Function that verifies whether the edge to be removed is part of the SSSP or not. + * If it is, the src vertex will be invalidated. + * + * @param edgeToBeRemoved + * @param edgesInSSSP + * @return + */ + private static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception { + + return edgesInSSSP.filter(new FilterFunction<Edge<Long, Double>>() { + @Override + public boolean filter(Edge<Long, Double> edge) throws Exception { + return edge.equals(edgeToBeRemoved); + } + }).count() > 0; + } + + public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> { + + @Override + public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception { + if (inMessages.hasNext()) { + Long outDegree = vertex.getOutDegree() - 1; + // check if the vertex has another SP-Edge + if (outDegree > 0) { + // there is another shortest path from the source to this vertex + } else { + // set own value to infinity + setNewVertexValue(Double.MAX_VALUE); + } + } + } + } + + public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> { + + private Edge<Long, Double> edgeToBeRemoved; + + public InvalidateMessenger(Edge<Long, Double> edgeToBeRemoved) { + this.edgeToBeRemoved = edgeToBeRemoved; + } + + @Override + public void sendMessages(Vertex<Long, Double> vertex) throws Exception { + + + if(getSuperstepNumber() == 1) { + if(vertex.getId().equals(edgeToBeRemoved.getSource())) { + // activate the edge target + sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE); + } + } + + if(getSuperstepNumber() > 1) { + // invalidate all edges + for(Edge<Long, Double> edge : getEdges()) { + sendMessageTo(edge.getSource(), Double.MAX_VALUE); + } + } + } + } + + // ****************************************************************************************************************** + // UTIL METHODS + // ****************************************************************************************************************** + + private static boolean fileOutput = false; + + private static String verticesInputPath = null; + + private static String edgesInputPath = null; + + private static String edgesInSSSPInputPath = null; + + private static String edgeToBeRemoved = null; + + private static String outputPath = null; + + private static int maxIterations = 5; + + private static boolean parseParameters(String[] args) { + if (args.length > 0) { + if (args.length == 6) { + fileOutput = true; + verticesInputPath = args[0]; + edgesInputPath = args[1]; + edgesInSSSPInputPath = args[2]; + edgeToBeRemoved = args[3]; + outputPath = args[4]; + maxIterations = Integer.parseInt(args[5]); + } else { + System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("See the documentation for the correct format of input files."); + System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> <output path> <max iterations>"); + + return false; + } + } + return true; + } + + private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(verticesInputPath) + .lineDelimiter("\n") + .types(Long.class, Double.class) + .map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() { + + @Override + public Vertex<Long, Double> map(Tuple2<Long, Double> tuple2) throws Exception { + return new Vertex<Long, Double>(tuple2.f0, tuple2.f1); + } + }); + } else { + System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " + + "<output path> <max iterations>"); + return IncrementalSSSPData.getDefaultVertexDataSet(env); + } + } + + private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgesInputPath) + .lineDelimiter("\n") + .types(Long.class, Long.class, Double.class) + .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() { + + @Override + public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception { + return new Edge(tuple3.f0, tuple3.f1, tuple3.f2); + } + }); + } else { + System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " + + "<output path> <max iterations>"); + return IncrementalSSSPData.getDefaultEdgeDataSet(env); + } + } + + private static DataSet<Edge<Long, Double>> getEdgesinSSSPDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgesInSSSPInputPath) + .lineDelimiter("\n") + .types(Long.class, Long.class, Double.class) + .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() { --- End diff -- same > Vertex-centric iteration extensions > ----------------------------------- > > Key: FLINK-1523 > URL: https://issues.apache.org/jira/browse/FLINK-1523 > Project: Flink > Issue Type: Improvement > Components: Gelly > Reporter: Vasia Kalavri > Assignee: Andra Lungu > > We would like to make the following extensions to the vertex-centric > iterations of Gelly: > - allow vertices to access their in/out degrees and the total number of > vertices of the graph, inside the iteration. > - allow choosing the neighborhood type (in/out/all) over which to run the > vertex-centric iteration. Now, the model uses the updates of the in-neighbors > to calculate state and send messages to out-neighbors. We could add a > parameter with value "in/out/all" to the {{VertexUpdateFunction}} and > {{MessagingFunction}}, that would indicate the type of neighborhood. -- This message was sent by Atlassian JIRA (v6.3.4#6332)