Repository: flink Updated Branches: refs/heads/master 954beca7e -> 52ebb295c
[FLINK-1837] [streaming] Throw Exception for checkpointed iterative programs Checkpointing currently does not support this special case Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52ebb295 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52ebb295 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52ebb295 Branch: refs/heads/master Commit: 52ebb295c6782e5cc9c7747656c278849ec9030a Parents: 954beca Author: mbalassi <mbala...@apache.org> Authored: Tue Apr 7 17:04:39 2015 +0200 Committer: mbalassi <mbala...@apache.org> Committed: Tue Apr 7 17:04:39 2015 +0200 ---------------------------------------------------------------------- .../apache/flink/streaming/api/StreamGraph.java | 7 ++++ .../apache/flink/streaming/api/IterateTest.java | 37 ++++++++++++++++---- 2 files changed, 38 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/52ebb295/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 351dec9..aa71804 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -481,6 +481,11 @@ public class StreamGraph extends StreamingPlan { */ public JobGraph getJobGraph(String jobGraphName) { + // temporarily forbid checkpointing for iterative jobs + if (isIterative() && isCheckpointingEnabled()){ + throw new UnsupportedOperationException("Checkpointing is currently not supported for iterative jobs!"); + } + this.jobName = jobGraphName; WindowingOptimizer.optimizeGraph(this); @@ -558,6 +563,8 @@ public class StreamGraph extends StreamingPlan { return iterationTimeouts.get(vertexID); } + public boolean isIterative() { return !iterationIds.isEmpty(); } + public String getOperatorName(Integer vertexID) { return operatorNames.get(vertexID); } http://git-wip-us.apache.org/repos/asf/flink/blob/52ebb295/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java index a64c4b1..31bd147 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -29,11 +29,13 @@ import org.junit.Test; import java.util.Collections; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class IterateTest { private static final long MEMORYSIZE = 32; private static boolean iterated[]; + private static int PARALLELISM = 2; public static final class IterationHead extends RichFlatMapFunction<Boolean,Boolean> { @@ -73,14 +75,10 @@ public class IterateTest { } } - @Test - public void test() throws Exception { - int parallelism = 2; - StreamExecutionEnvironment env = new TestStreamEnvironment(parallelism, MEMORYSIZE); - iterated = new boolean[parallelism]; + public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env){ env.setBufferTimeout(10); - DataStream<Boolean> source = env.fromCollection(Collections.nCopies(parallelism, false)); + DataStream<Boolean> source = env.fromCollection(Collections.nCopies(PARALLELISM, false)); IterativeDataStream<Boolean> iteration = source.iterate(3000); @@ -88,6 +86,15 @@ public class IterateTest { new IterationTail()); iteration.closeWith(increment).addSink(new MySink()); + return env; + } + + @Test + public void test() throws Exception { + StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE); + iterated = new boolean[PARALLELISM]; + + env = constructIterativeJob(env); env.execute(); @@ -97,4 +104,22 @@ public class IterateTest { } + @Test + public void testWithCheckPointing() throws Exception { + StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE); + + env = constructIterativeJob(env); + + env.enableCheckpointing(); + try { + env.execute(); + + // this statement should never be reached + fail(); + } catch (UnsupportedOperationException e) { + // expected behaviour + } + + } + }