[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15831233#comment-15831233 ] ASF GitHub Bot commented on FLINK-5113: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2939 > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15826104#comment-15826104 ] ASF GitHub Bot commented on FLINK-5113: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 Thanks a lot @zentol . > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15826101#comment-15826101 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2939 merging. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824135#comment-15824135 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2939 @kl0u Looks good, only thing left is to wait for travis. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824100#comment-15824100 ] ASF GitHub Bot commented on FLINK-5113: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 Hi @zentol ! I integrated some remaining comments could you have a look? > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744832#comment-15744832 ] ASF GitHub Bot commented on FLINK-5113: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 I think your argument is valid. This is how I had it before, but I changed it because it becomes too verbose, given that the `ListState.get()` just gives you an Iterable that you have to iterate over and put its elements in a list in order to check its size. But I think that it is worth doing it. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742957#comment-15742957 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2939 It should be part of this PR since this case simply didn't exist before the new interface. The equivalent to no state was the state being null, at which point restore was never called in the first place. Now, if the state is null we get an empty list, afaik. Here is what is confusing me: Every single function checks whether the state is empty. Every one. So, there is apparently the possibility that it's empty. But the behavior for that case does not seem well-defined. According to the code receiving an empty state list is not a reason to fail for any of these tests. If this is the case we don't need to actually implement `restoreState` in the first place since it is irrelevant to the result of the test If this is not the case we should try to fail as early as possible by adding a failure condition. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742498#comment-15742498 ] ASF GitHub Bot commented on FLINK-5113: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 @zentol Thanks for the review. I integrated your comments. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742489#comment-15742489 ] ASF GitHub Bot commented on FLINK-5113: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 The test should cover if something semantically wrong happened during restoring. For example, if you expected some state that never came, then the test should fail. This is not a matter of the interface and thus not part of this PR I think. What do you think? > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742218#comment-15742218 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91961116 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java --- @@ -68,15 +69,18 @@ else if (duplicateChecker.nextClearBit(0) != numElementsTotal) { } @Override - public Tuple2snapshotState(long checkpointId, long checkpointTimestamp) { - LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId); - return new Tuple2<>(numElements, duplicateChecker); + public List > snapshotState(long checkpointId, long timestamp) throws Exception { + LOG.info("Snapshot of counter "+numElements+" at checkpoint "+ checkpointId); --- End diff -- either add all missing spaces or none :P > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742217#comment-15742217 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91962004 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java --- @@ -513,8 +514,8 @@ public void cancel() { } private static class StatefulCounter - extends RichMapFunction- implements Checkpointed , CheckpointListener { + extends RichMapFunction --- End diff -- please revert the indentation changes > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742221#comment-15742221 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2939 Shouldn't all the `ListCheckpointed` functions that actually snapshot something fail if they are given an empty state list? > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742171#comment-15742171 ] ASF GitHub Bot commented on FLINK-5113: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 @zentol Thanks for the review. I integrated your comments and I am waiting for Travis. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735541#comment-15735541 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91730804 --- Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java --- @@ -72,34 +74,27 @@ public void run(SourceContext ctx) throws Exception { public void cancel() { running = false; } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return null; - } - - @Override - public void restoreState(Integer state) { - - } } - public static class StatefulMapper implements MapFunction, Checkpointed, CheckpointListener { + public static class StatefulMapper implements MapFunction , ListCheckpointed, CheckpointListener { private String someState; private boolean atLeastOneSnapshotComplete = false; private boolean restored = false; @Override - public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return this; + public List snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this); } @Override - public void restoreState(StatefulMapper state) { - restored = true; - this.someState = state.someState; - this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete; + public void restoreState(List state) throws Exception { + if (!state.isEmpty()) { --- End diff -- Alright, i figured out why we can't fail here immediately. It still seems odd though that do not explicitly differentiate between a call to restore before any state was snapshotted and a broken snapshotting that doesn't return a state, although this applies to all other tests as well. If the test is successful f that we are getting the state that we snapshotted we should also have failure condition in case this does not happen; currently we simply enter undefined territory. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735515#comment-15735515 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91724133 --- Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java --- @@ -72,34 +74,27 @@ public void run(SourceContext ctx) throws Exception { public void cancel() { running = false; } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return null; - } - - @Override - public void restoreState(Integer state) { - - } } - public static class StatefulMapper implements MapFunction, Checkpointed, CheckpointListener { + public static class StatefulMapper implements MapFunction , ListCheckpointed, CheckpointListener { private String someState; private boolean atLeastOneSnapshotComplete = false; private boolean restored = false; @Override - public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return this; + public List snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this); } @Override - public void restoreState(StatefulMapper state) { - restored = true; - this.someState = state.someState; - this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete; + public void restoreState(List state) throws Exception { + if (!state.isEmpty()) { --- End diff -- If the state is empty we should fail immediately; currently (I think) this would cause us to fail with the RuntimeException saying "Intended failure, to trigger restore", which is a bit inaccurate. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735508#comment-15735508 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91719698 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java --- @@ -199,11 +200,13 @@ public Serializable snapshotState(long checkpointId, long checkpointTimestamp) t Assert.fail("Count is different at start end end of snapshot."); } semaphore.release(); - return sum; + return Collections.singletonList((Serializable) sum); } @Override - public void restoreState(Serializable state) {} + public void restoreState(List state) throws Exception { + --- End diff -- can you remove this empty line? > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735512#comment-15735512 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91719644 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java --- @@ -1,4 +1,4 @@ -/** +/* --- End diff -- unrelated change > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735514#comment-15735514 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721509 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java --- @@ -47,7 +47,7 @@ * A simple test that runs a streaming topology with checkpointing enabled. * * The test triggers a failure after a while and verifies that, after completion, the - * state defined with either the {@link ValueState} or the {@link Checkpointed} + * state defined with either the {@link ValueState} or the {@link org.apache.flink.streaming.api.checkpoint.ListCheckpointed} --- End diff -- do not use the fully qualified class name here. (it's not required since you import the class anyway ;) ) > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735516#comment-15735516 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721182 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java --- @@ -362,19 +367,21 @@ public void flatMap2(String value, Collector out) { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { --- End diff -- Please retain the order of the original methods; snapshot -> restore -> close > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735510#comment-15735510 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721756 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java --- @@ -447,5 +434,23 @@ public void notifyCheckpointComplete(long checkpointId) { GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); } } + + @Override + public List snapshotState(long checkpointId, long timestamp) throws Exception { + if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) { --- End diff -- please move this methods up again to reduce the diff. The methods are identical apart from the signature and return statement, the diff should reflect that. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735505#comment-15735505 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721848 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java --- @@ -372,23 +374,25 @@ public void notifyCheckpointComplete(long checkpointId) { numSuccessfulCheckpoints++; } - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsEmitted; + public static void reset() { + failedBefore = false; --- End diff -- method order, snapshot -> restore -> reset > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735509#comment-15735509 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91719518 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java --- @@ -128,11 +131,16 @@ private static Task createTask( when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); - ChainedStateHandle operatorState = new ChainedStateHandle<>(Collections.singletonList(state)); + ChainedStateHandle operatorState = null; List keyGroupStateFromBackend = Collections.emptyList(); List keyGroupStateFromStream = Collections.emptyList(); - ListoperatorStateBackend = Collections.emptyList(); - List operatorStateStream = Collections.emptyList(); + + Map testState = new HashMap<>(); + testState.put("test", new long[] {0, 10}); + + Collection handle = Collections.singletonList(new OperatorStateHandle(testState, state)); + List operatorStateBackend = Collections.singletonList(handle); + List operatorStateStream = Collections.singletonList(handle); --- End diff -- Can this be an empty list as well? > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735506#comment-15735506 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721132 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java --- @@ -323,26 +329,25 @@ public PrefixCount map(String value) { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public void close() throws IOException { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count; --- End diff -- Please retain the order of the original methods; snapshot -> restore -> close > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735511#comment-15735511 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91720508 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java --- @@ -199,11 +200,13 @@ public Serializable snapshotState(long checkpointId, long checkpointTimestamp) t Assert.fail("Count is different at start end end of snapshot."); } semaphore.release(); - return sum; + return Collections.singletonList((Serializable) sum); --- End diff -- You don't to cast here, instead use `return Collections.singletonList(sum);` > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735517#comment-15735517 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721578 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java --- @@ -338,14 +344,17 @@ public void flatMap(PrefixCount value, Collector out) throws Except } } + --- End diff -- remove new line > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735507#comment-15735507 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91720690 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java --- @@ -566,23 +568,25 @@ public void notifyCheckpointComplete(long checkpointId) { numSuccessfulCheckpoints++; } - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsEmitted; + public static void reset() { --- End diff -- please move this method to the bottom of the class again. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735513#comment-15735513 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91722459 --- Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java --- @@ -57,7 +59,7 @@ public static void main(String[] args) throws Exception { // with Checkpoining - public static class SimpleStringGenerator implements SourceFunction, Checkpointed { --- End diff -- I understand why you did not replace ´Checkpointed` with a different interface, but this technically changes the test. If a function can still implement these interfaces without doing anything within snapshot/restore, then we should cover this with tests. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15721823#comment-15721823 ] ASF GitHub Bot commented on FLINK-5113: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2939 [FLINK-5113] Ports all functions in the tests to the new checkpointing interface. This PR only touches testing code. It refactors all the tests that were using the `Checkpointed` and `CheckpointedAsynchronously` interfaces to use the new `ListCheckpointed` and `CheckpointedFunction` ones. There is one commit per class, but the changes are very small and easy to review. R: @aljoscha @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink test-func-ref-reb Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2939.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2939 commit e8eecc97385b66dccdc31c057591e265e1dfc963 Author: kl0uDate: 2016-11-22T17:38:54Z [FLINK-5113] CoStreamCheckpointingITCase refactoring. commit 588d98175bf14699fca8ef0117ee37f2acdb8c65 Author: kl0u Date: 2016-11-22T18:23:33Z [FLINK-5113] EventTimeAllWindowCheckpointingITCase refactoring. commit 31d642eeafd98587999cd1ec69aff87b6224f941 Author: kl0u Date: 2016-11-22T18:32:24Z [FLINK-5113] AbstractEventTimeWindowCheckpointingITCase refactoring. commit 26a1ffefaf4b7fa01b7df80bd175e98337a0eef0 Author: kl0u Date: 2016-11-22T18:49:47Z [FLINK-5113] StreamCheckpointingITCase refactoring. commit efacdbd5fce6e3ebb590a952da3f8e1b1b7b42ea Author: kl0u Date: 2016-11-23T10:49:24Z [FLINK-5113] StreamCheckpointNotifierITCase refactoring commit 9815ee405fa21d875963039cb145eea21025e737 Author: kl0u Date: 2016-11-23T10:56:06Z [FLINK-5113] BucketingSinkFaultToleranceITCase refactoring. commit b1cc019622fedcb08b0e0a232a3deedd2c4e37c5 Author: kl0u Date: 2016-11-23T11:11:11Z [FLINK-5113] StateCheckpointedITCase refactoring commit f6e73c2fe6f34953e5029ba79845b720ac5a0c05 Author: kl0u Date: 2016-11-23T11:21:24Z [FLINK-5113] CheckpointingCustomKvStateProgram refactoring. commit d6c4ad75f7f47c872fb5fb6650c9f3a111a55108 Author: kl0u Date: 2016-11-23T11:31:11Z [FLINK-5113] JobManagerHACheckpointRecoveryITCase refactoring commit 7a99f7bb186be2a235ba28e3d61d65f60a82b571 Author: kl0u Date: 2016-11-23T13:14:40Z [FLINK-5113] CheckpointedStreamingProgram refactoring. commit 30096fe6399c8fd3f14bd5c23af4ee1506ca26e2 Author: kl0u Date: 2016-11-23T13:24:54Z [FLINK-5113] WindowCheckpointingITCase refactoring. commit 02424d0ea88986618cb66d9b5d902e86512981dc Author: kl0u Date: 2016-11-23T13:31:18Z [FLINK-5113] TaskManagerProcessFailureStreamingRecoveryITCase refactoring. commit 65eed028ff2af4106b509c55f2cf15c902e6f6d6 Author: kl0u Date: 2016-11-23T13:41:58Z [FLINK-5113] RescalingITCase refactoring commit f25c387d919eb569fd80abbf3621e3482c9fdb79 Author: kl0u Date: 2016-11-23T13:55:02Z [FLINK-5113] CassandraTupleWriteAheadSinkExample refactoring. commit 6071795423b3c9bf5143c32c93ce69e159e9bb4f Author: kl0u Date: 2016-11-23T14:02:10Z [FLINK-5113] SavepointITCase refactoring. commit 4212d45a5592f83016864d4a588219672797142e Author: kl0u Date: 2016-11-23T14:11:29Z [FLINK-5113] ChaosMonkeyITCase refactoring. commit 32738006d4e48ac42985ed2b77b9b6555e352e75 Author: kl0u Date: 2016-11-23T14:16:37Z [FLINK-5113] UdfStreamOperatorCheckpointingITCase refactoring. commit a7e883ff6d1c56e84be4ed49e0b53acab2bce54d Author: kl0u Date: 2016-11-23T14:19:58Z [FLINK-5113] InterruptSensitiveRestoreTest refactoring. commit 6c1acbba4f3bae5c3b5ba812beafa3caeddc4a0f Author: kl0u Date: 2016-11-23T16:08:38Z [FLINK-5113] KafkaConsumerTestBase refactored. commit b88d9085ddedff90e68534b84450796d9edd3396 Author: kl0u Date: 2016-11-23T16:12:35Z [FLINK-5113] ValidatingExactlyOnceSink refactoring. commit a9e2d86177ccd446bc13e353e09cbb5c8c0a7016 Author: kl0u Date: 2016-11-23T16:21:19Z [FLINK-5113] PartitionedStateCheckpointingITCase refactoring. commit 8161507acf63926e52dcd0952565017aafaa1359 Author: kl0u Date: 2016-11-23T16:34:45Z [FLINK-5113] FailingIdentityMapper refactoring. commit