Repository: flink Updated Branches: refs/heads/master f7b113d96 -> a50eb73e2
[FLINK-3181] [gelly] avoid unnecessary messages in SSSP examples and library method This closes #1467 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a50eb73e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a50eb73e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a50eb73e Branch: refs/heads/master Commit: a50eb73e28fab48e92dc34c12b0b2ca4d99e1bf2 Parents: f7b113d Author: vasia <va...@apache.org> Authored: Thu Dec 17 16:11:58 2015 +0100 Committer: vasia <va...@apache.org> Committed: Mon Dec 28 15:05:41 2015 +0200 ---------------------------------------------------------------------- .../graph/scala/example/SingleSourceShortestPaths.scala | 6 ++++-- .../flink/graph/example/SingleSourceShortestPaths.java | 6 ++++-- .../flink/graph/library/SingleSourceShortestPaths.java | 9 +++++---- 3 files changed, 13 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a50eb73e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala index 7fc23c4..8b918d4 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala @@ -113,8 +113,10 @@ object SingleSourceShortestPaths { MessagingFunction[Long, Double, Double, Double] { override def sendMessages(vertex: Vertex[Long, Double]) { - for (edge: Edge[Long, Double] <- getEdges) { - sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue) + if (vertex.getValue < Double.PositiveInfinity) { + for (edge: Edge[Long, Double] <- getEdges) { + sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue) + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a50eb73e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java index ef09bff..40e304c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java @@ -135,8 +135,10 @@ public class SingleSourceShortestPaths implements ProgramDescription { @Override public void sendMessages(Vertex<Long, Double> vertex) { - for (Edge<Long, Double> edge : getEdges()) { - sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); + if (vertex.getValue() < Double.POSITIVE_INFINITY) { + for (Edge<Long, Double> edge : getEdges()) { + sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a50eb73e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 68bc1ae..baf7fe1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -108,10 +108,11 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> { @Override - public void sendMessages(Vertex<K, Double> vertex) - throws Exception { - for (Edge<K, Double> edge : getEdges()) { - sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); + public void sendMessages(Vertex<K, Double> vertex) { + if (vertex.getValue() < Double.POSITIVE_INFINITY) { + for (Edge<K, Double> edge : getEdges()) { + sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); + } } } }