Repository: flink
Updated Branches:
  refs/heads/master 41ab43604 -> e8faff795


[FLINK-1813] Avoid illegalstate exception when trying to broadcast on finished 
operators

Closes #577


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8faff79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8faff79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8faff79

Branch: refs/heads/master
Commit: e8faff795005205ca877f8e9c4588e5bde42fd65
Parents: 41ab436
Author: Gyula Fora <gyf...@apache.org>
Authored: Tue Apr 7 22:17:18 2015 +0200
Committer: Gyula Fora <gyf...@apache.org>
Committed: Wed Apr 8 15:23:56 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/streamvertex/StreamVertex.java | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e8faff79/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 5b80531..b56eda3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -96,13 +96,11 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
 
        @Override
        public void broadcastBarrierFromSource(long id) {
-               if (this.isRunning) {
-                       // Only called at input vertices
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Received barrier from jobmanager: " 
+ id);
-                       }
-                       actOnBarrier(id);
+               // Only called at input vertices
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Received barrier from jobmanager: " + id);
                }
+               actOnBarrier(id);
        }
 
        /**
@@ -190,10 +188,10 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
                        }
                        throw e;
                } finally {
+                       this.isRunning = false;
                        // Cleanup
                        outputHandler.flushOutputs();
                        clearBuffers();
-                       this.isRunning = false;
                }
 
        }
@@ -288,7 +286,7 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
         * @param id
         */
        private synchronized void actOnBarrier(long id) {
-               if (this.isRunning) {
+               if (isRunning) {
                        try {
                                outputHandler.broadcastBarrier(id);
                                confirmBarrier(id);
@@ -296,7 +294,10 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
                                        LOG.debug("Superstep " + id + " 
processed: " + StreamVertex.this);
                                }
                        } catch (Exception e) {
-                               throw new RuntimeException(e);
+                               // Only throw any exception if the vertex is 
still running
+                               if (isRunning) {
+                                       throw new RuntimeException(e);
+                               }
                        }
                }
        }

Reply via email to