Repository: flink
Updated Branches:
  refs/heads/master 954beca7e -> 52ebb295c


[FLINK-1837] [streaming] Throw Exception for checkpointed iterative programs

Checkpointing currently does not support this special case


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52ebb295
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52ebb295
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52ebb295

Branch: refs/heads/master
Commit: 52ebb295c6782e5cc9c7747656c278849ec9030a
Parents: 954beca
Author: mbalassi <mbala...@apache.org>
Authored: Tue Apr 7 17:04:39 2015 +0200
Committer: mbalassi <mbala...@apache.org>
Committed: Tue Apr 7 17:04:39 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/StreamGraph.java |  7 ++++
 .../apache/flink/streaming/api/IterateTest.java | 37 ++++++++++++++++----
 2 files changed, 38 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/52ebb295/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 351dec9..aa71804 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -481,6 +481,11 @@ public class StreamGraph extends StreamingPlan {
         */
        public JobGraph getJobGraph(String jobGraphName) {
 
+               // temporarily forbid checkpointing for iterative jobs
+               if (isIterative() && isCheckpointingEnabled()){
+                       throw new UnsupportedOperationException("Checkpointing 
is currently not supported for iterative jobs!");
+               }
+
                this.jobName = jobGraphName;
 
                WindowingOptimizer.optimizeGraph(this);
@@ -558,6 +563,8 @@ public class StreamGraph extends StreamingPlan {
                return iterationTimeouts.get(vertexID);
        }
 
+       public boolean isIterative() { return !iterationIds.isEmpty(); }
+
        public String getOperatorName(Integer vertexID) {
                return operatorNames.get(vertexID);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/52ebb295/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index a64c4b1..31bd147 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -29,11 +29,13 @@ import org.junit.Test;
 import java.util.Collections;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class IterateTest {
 
        private static final long MEMORYSIZE = 32;
        private static boolean iterated[];
+       private static int PARALLELISM = 2;
 
        public static final class IterationHead extends 
RichFlatMapFunction<Boolean,Boolean> {
 
@@ -73,14 +75,10 @@ public class IterateTest {
                }
        }
 
-       @Test
-       public void test() throws Exception {
-               int parallelism = 2;
-               StreamExecutionEnvironment env = new 
TestStreamEnvironment(parallelism, MEMORYSIZE);
-               iterated = new boolean[parallelism];
+       public StreamExecutionEnvironment 
constructIterativeJob(StreamExecutionEnvironment env){
                env.setBufferTimeout(10);
 
-               DataStream<Boolean> source = 
env.fromCollection(Collections.nCopies(parallelism, false));
+               DataStream<Boolean> source = 
env.fromCollection(Collections.nCopies(PARALLELISM, false));
 
                IterativeDataStream<Boolean> iteration = source.iterate(3000);
 
@@ -88,6 +86,15 @@ public class IterateTest {
                                new IterationTail());
 
                iteration.closeWith(increment).addSink(new MySink());
+               return env;
+       }
+
+       @Test
+       public void test() throws Exception {
+               StreamExecutionEnvironment env = new 
TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+               iterated = new boolean[PARALLELISM];
+
+               env = constructIterativeJob(env);
 
                env.execute();
 
@@ -97,4 +104,22 @@ public class IterateTest {
 
        }
 
+       @Test
+       public void testWithCheckPointing() throws Exception {
+               StreamExecutionEnvironment env = new 
TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+
+               env = constructIterativeJob(env);
+
+               env.enableCheckpointing();
+               try {
+                       env.execute();
+
+                       // this statement should never be reached
+                       fail();
+               } catch (UnsupportedOperationException e) {
+                       // expected behaviour
+               }
+
+       }
+
 }

Reply via email to