[FLINK-5113] Port functions in tests to new CheckpointedFunction IF.

This closes #2939.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/525edf1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/525edf1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/525edf1e

Branch: refs/heads/master
Commit: 525edf1e6925b55302d991ddf537a2f16caba21d
Parents: 570dbc8
Author: kl0u <kklou...@gmail.com>
Authored: Tue Nov 22 19:23:33 2016 +0100
Committer: zentol <ches...@apache.org>
Committed: Thu Jan 19 23:57:22 2017 +0100

----------------------------------------------------------------------
 .../CassandraTupleWriteAheadSinkExample.java    |  22 ++-
 .../fs/RollingSinkFaultToleranceITCase.java     |  18 ++-
 .../BucketingSinkFaultToleranceITCase.java      |  18 ++-
 .../connectors/kafka/KafkaConsumerTestBase.java |  22 ++-
 .../kafka/testutils/FailingIdentityMapper.java  |  18 ++-
 .../testutils/ValidatingExactlyOnceSink.java    |  26 ++--
 .../org/apache/flink/util/CollectionUtil.java   |   2 +-
 .../streaming/api/checkpoint/Checkpointed.java  |   3 +-
 .../runtime/tasks/SourceStreamTaskTest.java     |  12 +-
 ...tractEventTimeWindowCheckpointingITCase.java |  43 ++++--
 .../CoStreamCheckpointingITCase.java            |  74 ++++++----
 .../EventTimeAllWindowCheckpointingITCase.java  |  38 +++--
 .../PartitionedStateCheckpointingITCase.java    |  17 ++-
 .../test/checkpointing/RescalingITCase.java     |  27 ++--
 .../test/checkpointing/SavepointITCase.java     |  14 +-
 .../checkpointing/StateCheckpointedITCase.java  |  94 +++++++-----
 .../StreamCheckpointNotifierITCase.java         |  31 ++--
 .../StreamCheckpointingITCase.java              |  79 ++++++----
 .../UdfStreamOperatorCheckpointingITCase.java   |  14 +-
 .../WindowCheckpointingITCase.java              |  33 +++--
 .../jar/CheckpointedStreamingProgram.java       |  28 ++--
 .../jar/CheckpointingCustomKvStateProgram.java  |  21 +--
 .../jar/LegacyCheckpointedStreamingProgram.java | 148 +++++++++++++++++++
 .../flink/test/recovery/ChaosMonkeyITCase.java  |  38 +++--
 .../JobManagerHACheckpointRecoveryITCase.java   |  41 +++--
 ...erProcessFailureStreamingRecoveryITCase.java |  30 ++--
 26 files changed, 608 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
index 811c410..23de949 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
@@ -21,12 +21,14 @@ import com.datastax.driver.core.Cluster;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 
 /**
@@ -50,6 +52,9 @@ public class CassandraTupleWriteAheadSinkExample {
                        .setQuery("INSERT INTO example.values (id, counter) 
values (?, ?);")
                        .enableWriteAheadLog()
                        .setClusterBuilder(new ClusterBuilder() {
+
+                               private static final long serialVersionUID = 
2793938419775311824L;
+
                                @Override
                                public Cluster buildCluster(Cluster.Builder 
builder) {
                                        return 
builder.addContactPoint("127.0.0.1").build();
@@ -62,7 +67,9 @@ public class CassandraTupleWriteAheadSinkExample {
                env.execute();
        }
 
-       public static class MySource implements SourceFunction<Tuple2<String, 
Integer>>, Checkpointed<Integer> {
+       public static class MySource implements SourceFunction<Tuple2<String, 
Integer>>, ListCheckpointed<Integer> {
+               private static final long serialVersionUID = 
4022367939215095610L;
+
                private int counter = 0;
                private boolean stop = false;
 
@@ -84,13 +91,16 @@ public class CassandraTupleWriteAheadSinkExample {
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return counter;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.counter);
                }
 
                @Override
-               public void restoreState(Integer state) throws Exception {
-                       this.counter = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.counter = state.get(0);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index 36c0d03..2d8492f 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.fs;
 
 import com.google.common.collect.Sets;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -42,7 +42,9 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -236,7 +238,7 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
        }
 
        private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<String>
-                       implements CheckpointedAsynchronously<Integer> {
+                       implements ListCheckpointed<Integer> {
 
                private static final long serialVersionUID = 1L;
 
@@ -246,7 +248,6 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
 
                private volatile boolean isRunning = true;
 
-
                StringGeneratingSourceFunction(long numElements) {
                        this.numElements = numElements;
                }
@@ -288,13 +289,16 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return index;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.index);
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       index = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.index = state.get(0);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
index 54703a3..85f23b6 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import com.google.common.collect.Sets;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -42,7 +42,9 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.BufferedReader;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -233,7 +235,7 @@ public class BucketingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestB
        }
 
        private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<String>
-                       implements CheckpointedAsynchronously<Integer> {
+                       implements ListCheckpointed<Integer> {
 
                private static final long serialVersionUID = 1L;
 
@@ -243,7 +245,6 @@ public class BucketingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestB
 
                private volatile boolean isRunning = true;
 
-
                StringGeneratingSourceFunction(long numElements) {
                        this.numElements = numElements;
                }
@@ -285,13 +286,16 @@ public class BucketingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestB
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return index;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(index);
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       index = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.index = state.get(0);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index aa7ea49..d7fab88 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -51,21 +51,16 @@ import 
org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobExecutionException;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
@@ -73,7 +68,6 @@ import 
org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidating
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -1925,7 +1919,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
 
        public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
-                       implements Checkpointed<Integer>, CheckpointListener {
+                       implements ListCheckpointed<Integer>, 
CheckpointListener {
 
                private static final long serialVersionUID = 
6334389850158707313L;
 
@@ -1939,7 +1933,6 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                private boolean failer;
                private boolean hasBeenCheckpointed;
 
-
                public BrokerKillingMapper(int shutdownBrokerId, int failCount) 
{
                        this.shutdownBrokerId = shutdownBrokerId;
                        this.failCount = failCount;
@@ -1994,13 +1987,16 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return numElementsTotal;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.numElementsTotal);
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       this.numElementsTotal = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.numElementsTotal = state.get(0);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
index 2bd400c..ec64b00 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -21,13 +21,16 @@ package 
org.apache.flink.streaming.connectors.kafka.testutils;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.List;
+
 
 public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
-               Checkpointed<Integer>, CheckpointListener, Runnable {
+       ListCheckpointed<Integer>, CheckpointListener, Runnable {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(FailingIdentityMapper.class);
        
@@ -89,13 +92,16 @@ public class FailingIdentityMapper<T> extends 
RichMapFunction<T,T> implements
        }
 
        @Override
-       public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-               return numElementsTotal;
+       public List<Integer> snapshotState(long checkpointId, long timestamp) 
throws Exception {
+               return Collections.singletonList(numElementsTotal);
        }
 
        @Override
-       public void restoreState(Integer state) {
-               numElementsTotal = state;
+       public void restoreState(List<Integer> state) throws Exception {
+               if (state.isEmpty() || state.size() > 1) {
+                       throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+               }
+               this.numElementsTotal = state.get(0);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
index 7813561..46e70fd 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -19,15 +19,17 @@
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.test.util.SuccessException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
 
-public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> 
implements Checkpointed<Tuple2<Integer, BitSet>> {
+public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> 
implements ListCheckpointed<Tuple2<Integer, BitSet>> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
 
@@ -39,7 +41,6 @@ public class ValidatingExactlyOnceSink extends 
RichSinkFunction<Integer> impleme
 
        private int numElements; // this is checkpointed
 
-       
        public ValidatingExactlyOnceSink(int numElementsTotal) {
                this.numElementsTotal = numElementsTotal;
        }
@@ -68,15 +69,20 @@ public class ValidatingExactlyOnceSink extends 
RichSinkFunction<Integer> impleme
        }
 
        @Override
-       public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long 
checkpointTimestamp) {
-               LOG.info("Snapshot of counter "+numElements+" at checkpoint 
"+checkpointId);
-               return new Tuple2<>(numElements, duplicateChecker);
+       public List<Tuple2<Integer, BitSet>> snapshotState(long checkpointId, 
long timestamp) throws Exception {
+               LOG.info("Snapshot of counter " + numElements + " at checkpoint 
" + checkpointId);
+               return Collections.singletonList(new Tuple2<>(numElements, 
duplicateChecker));
        }
 
        @Override
-       public void restoreState(Tuple2<Integer, BitSet> state) {
-               LOG.info("restoring num elements to {}", state.f0);
-               this.numElements = state.f0;
-               this.duplicateChecker = state.f1;
+       public void restoreState(List<Tuple2<Integer, BitSet>> state) throws 
Exception {
+               if (state.isEmpty() || state.size() > 1) {
+                       throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+               }
+
+               Tuple2<Integer, BitSet> s = state.get(0);
+               LOG.info("restoring num elements to {}", s.f0);
+               this.numElements = s.f0;
+               this.duplicateChecker = s.f1;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
index 15d00ae..cd5c2e5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
@@ -34,4 +34,4 @@ public final class CollectionUtil {
        public static boolean isNullOrEmpty(Map<?, ?> map) {
                return map == null || map.isEmpty();
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index 7af5cea..fb67ea7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -31,8 +31,7 @@ import java.io.Serializable;
  * state is written, the function is not called, so the function needs not 
return a
  * copy of its state, but may return a reference to its state. Functions that 
can
  * continue to work and mutate the state, even while the state snapshot is 
being accessed,
- * can implement the {@link 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously}
- * interface.</p>
+ * can implement the {@link CheckpointedAsynchronously} interface.</p>
  * 
  * @param <T> The type of the operator state.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index b592fe8..dd1fe58 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -34,6 +34,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -135,7 +136,7 @@ public class SourceStreamTaskTest {
                }
        }
 
-       private static class MockSource implements SourceFunction<Tuple2<Long, 
Integer>>, Checkpointed<Serializable> {
+       private static class MockSource implements SourceFunction<Tuple2<Long, 
Integer>>, ListCheckpointed<Serializable> {
                private static final long serialVersionUID = 1;
 
                private int maxElements;
@@ -181,7 +182,7 @@ public class SourceStreamTaskTest {
                }
 
                @Override
-               public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+               public List<Serializable> snapshotState(long checkpointId, long 
timestamp) throws Exception {
                        if (!semaphore.tryAcquire()) {
                                Assert.fail("Concurrent invocation of 
snapshotState.");
                        }
@@ -199,11 +200,12 @@ public class SourceStreamTaskTest {
                                Assert.fail("Count is different at start end 
end of snapshot.");
                        }
                        semaphore.release();
-                       return sum;
+                       return Collections.<Serializable>singletonList(sum);
                }
 
                @Override
-               public void restoreState(Serializable state) {}
+               public void restoreState(List<Serializable> state) throws 
Exception {
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 583e42f..1911f44 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -53,7 +53,9 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -495,7 +497,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        // 
------------------------------------------------------------------------
 
        private static class FailingSource extends 
RichSourceFunction<Tuple2<Long, IntType>>
-                       implements Checkpointed<Integer>, CheckpointListener
+                       implements ListCheckpointed<Integer>, CheckpointListener
        {
                private static volatile boolean failedBefore = false;
 
@@ -567,13 +569,16 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return numElementsEmitted;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return 
Collections.singletonList(this.numElementsEmitted);
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       numElementsEmitted = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.numElementsEmitted = state.get(0);
                }
 
                public static void reset() {
@@ -582,7 +587,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        }
 
        private static class ValidatingSink extends 
RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-                       implements Checkpointed<HashMap<Long, Integer>> {
+                       implements ListCheckpointed<HashMap<Long, Integer>> {
 
                private final HashMap<Long, Integer> windowCounts = new 
HashMap<>();
 
@@ -676,19 +681,22 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                }
 
                @Override
-               public HashMap<Long, Integer> snapshotState(long checkpointId, 
long checkpointTimestamp) {
-                       return this.windowCounts;
+               public List<HashMap<Long, Integer>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
+                       return Collections.singletonList(this.windowCounts);
                }
 
                @Override
-               public void restoreState(HashMap<Long, Integer> state) {
-                       this.windowCounts.putAll(state);
+               public void restoreState(List<HashMap<Long, Integer>> state) 
throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       windowCounts.putAll(state.get(0));
                }
        }
 
        // Sink for validating the stateful window counts
        private static class CountValidatingSink extends 
RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-                       implements Checkpointed<HashMap<Long, Integer>> {
+                       implements ListCheckpointed<HashMap<Long, Integer>> {
 
                private final HashMap<Long, Integer> windowCounts = new 
HashMap<>();
 
@@ -757,13 +765,16 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                }
 
                @Override
-               public HashMap<Long, Integer> snapshotState(long checkpointId, 
long checkpointTimestamp) {
-                       return this.windowCounts;
+               public List<HashMap<Long, Integer>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
+                       return Collections.singletonList(this.windowCounts);
                }
 
                @Override
-               public void restoreState(HashMap<Long, Integer> state) {
-                       this.windowCounts.putAll(state);
+               public void restoreState(List<HashMap<Long, Integer>> state) 
throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.windowCounts.putAll(state.get(0));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index 51a00b9..06d3ab0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
@@ -38,6 +38,8 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
@@ -142,11 +144,11 @@ public class CoStreamCheckpointingITCase extends 
StreamingMultipleProgramsTestBa
        /**
         * A generating source that is slow before the first two checkpoints 
went through
         * and will indefinitely stall at a certain point to allow the 
checkpoint to complete.
-        * 
+        *
         * After the checkpoints are through, it continues with full speed.
         */
        private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<String>
-                       implements Checkpointed<Integer>, CheckpointListener {
+                       implements ListCheckpointed<Integer>, 
CheckpointListener {
 
                private static volatile int numCompletedCheckpoints = 0;
 
@@ -210,13 +212,16 @@ public class CoStreamCheckpointingITCase extends 
StreamingMultipleProgramsTestBa
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return index;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.index);
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       index = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.index = state.get(0);
                }
 
                @Override
@@ -239,12 +244,11 @@ public class CoStreamCheckpointingITCase extends 
StreamingMultipleProgramsTestBa
        }
 
        private static class StatefulCounterFunction extends 
RichMapFunction<PrefixCount, PrefixCount> 
-                       implements Checkpointed<Long> {
+                       implements ListCheckpointed<Long> {
 
                static final long[] counts = new long[PARALLELISM];
                
                private long count;
-               
 
                @Override
                public PrefixCount map(PrefixCount value) throws Exception {
@@ -258,13 +262,16 @@ public class CoStreamCheckpointingITCase extends 
StreamingMultipleProgramsTestBa
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
        }
 
@@ -303,7 +310,7 @@ public class CoStreamCheckpointingITCase extends 
StreamingMultipleProgramsTestBa
                }
        }
 
-       private static class StringRichFilterFunction extends 
RichFilterFunction<String> implements Checkpointed<Long> {
+       private static class StringRichFilterFunction extends 
RichFilterFunction<String> implements ListCheckpointed<Long> {
 
                static final long[] counts = new long[PARALLELISM];
 
@@ -321,18 +328,21 @@ public class CoStreamCheckpointingITCase extends 
StreamingMultipleProgramsTestBa
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
        }
 
-       private static class StringPrefixCountRichMapFunction extends 
RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
-
+       private static class StringPrefixCountRichMapFunction extends 
RichMapFunction<String, PrefixCount> implements ListCheckpointed<Long> {
+               
                static final long[] counts = new long[PARALLELISM];
 
                private long count;
@@ -344,13 +354,16 @@ public class CoStreamCheckpointingITCase extends 
StreamingMultipleProgramsTestBa
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
 
                @Override
@@ -359,7 +372,7 @@ public class CoStreamCheckpointingITCase extends 
StreamingMultipleProgramsTestBa
                }
        }
 
-       private static class LeftIdentityCoRichFlatMapFunction extends 
RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> {
+       private static class LeftIdentityCoRichFlatMapFunction extends 
RichCoFlatMapFunction<String, String, String> implements ListCheckpointed<Long> 
{
 
                static final long[] counts = new long[PARALLELISM];
 
@@ -378,13 +391,16 @@ public class CoStreamCheckpointingITCase extends 
StreamingMultipleProgramsTestBa
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index b493e42..09c1437 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -43,14 +43,16 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.*;
 
 /**
- * This verfies that checkpointing works correctly with event time windows.
+ * This verifies that checkpointing works correctly with event time windows.
  *
  * <p>
  * This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for 
All-Windows.
@@ -432,7 +434,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
        // 
------------------------------------------------------------------------
 
        private static class FailingSource extends 
RichSourceFunction<Tuple2<Long, IntType>>
-                       implements Checkpointed<Integer>, CheckpointListener
+                       implements ListCheckpointed<Integer>, CheckpointListener
        {
                private static volatile boolean failedBefore = false;
 
@@ -502,23 +504,26 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                        numSuccessfulCheckpoints++;
                }
 
-               @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return numElementsEmitted;
+               public static void reset() {
+                       failedBefore = false;
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       numElementsEmitted = state;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return 
Collections.singletonList(this.numElementsEmitted);
                }
 
-               public static void reset() {
-                       failedBefore = false;
+               @Override
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.numElementsEmitted = state.get(0);
                }
        }
 
        private static class ValidatingSink extends 
RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-                       implements Checkpointed<HashMap<Long, Integer>> {
+                       implements ListCheckpointed<HashMap<Long, Integer>> {
 
                private final HashMap<Long, Integer> windowCounts = new 
HashMap<>();
 
@@ -612,13 +617,16 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                }
 
                @Override
-               public HashMap<Long, Integer> snapshotState(long checkpointId, 
long checkpointTimestamp) {
-                       return this.windowCounts;
+               public List<HashMap<Long, Integer>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
+                       return Collections.singletonList(this.windowCounts);
                }
 
                @Override
-               public void restoreState(HashMap<Long, Integer> state) {
-                       this.windowCounts.putAll(state);
+               public void restoreState(List<HashMap<Long, Integer>> state) 
throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.windowCounts.putAll(state.get(0));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 0728b41..4761d70 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
@@ -33,7 +35,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -86,7 +88,7 @@ public class PartitionedStateCheckpointingITCase extends 
StreamFaultToleranceTes
        // 
--------------------------------------------------------------------------------------------
 
        private static class IntGeneratingSourceFunction extends 
RichParallelSourceFunction<Integer> 
-               implements Checkpointed<Integer> {
+               implements ListCheckpointed<Integer> {
 
                private final long numElements;
 
@@ -133,13 +135,16 @@ public class PartitionedStateCheckpointingITCase extends 
StreamFaultToleranceTes
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return index;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.index);
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       index = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.index = state.get(0);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index bd1678e..8045d82 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -66,6 +65,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -763,7 +763,7 @@ public class RescalingITCase extends TestLogger {
                }
        }
 
-       private static class SubtaskIndexNonPartitionedStateSource extends 
SubtaskIndexSource implements Checkpointed<Integer> {
+       private static class SubtaskIndexNonPartitionedStateSource extends 
SubtaskIndexSource implements ListCheckpointed<Integer> {
 
                private static final long serialVersionUID = 
8388073059042040203L;
 
@@ -772,13 +772,16 @@ public class RescalingITCase extends TestLogger {
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return counter;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.counter);
                }
 
                @Override
-               public void restoreState(Integer state) throws Exception {
-                       counter = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.counter = state.get(0);
                }
        }
 
@@ -879,18 +882,20 @@ public class RescalingITCase extends TestLogger {
                }
        }
 
-       private static class NonPartitionedStateSource extends StateSourceBase 
implements Checkpointed<Integer> {
+       private static class NonPartitionedStateSource extends StateSourceBase 
implements ListCheckpointed<Integer> {
 
                private static final long serialVersionUID = 
-8108185918123186841L;
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return counter;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.counter);
                }
 
                @Override
-               public void restoreState(Integer state) throws Exception {
-                       counter = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (!state.isEmpty()) {
+                               this.counter = state.get(0);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 9f957e5..77777d1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -63,7 +63,6 @@ import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseS
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -527,7 +526,7 @@ public class SavepointITCase extends TestLogger {
 
        private static class StatefulCounter
                extends RichMapFunction<Integer, Integer>
-               implements Checkpointed<byte[]>, CheckpointListener {
+               implements ListCheckpointed<byte[]>, CheckpointListener {
 
                private static final Object checkpointLock = new Object();
                private static int numCompleteCalls;
@@ -556,13 +555,16 @@ public class SavepointITCase extends TestLogger {
                }
 
                @Override
-               public byte[] snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return data;
+               public List<byte[]> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(data);
                }
 
                @Override
-               public void restoreState(byte[] data) throws Exception {
-                       this.data = data;
+               public void restoreState(List<byte[]> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.data = state.get(0);
 
                        synchronized (checkpointLock) {
                                if (++numRestoreCalls == 
getRuntimeContext().getNumberOfParallelSubtasks()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
index 9d37b59..32d9e23 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -34,9 +33,10 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -47,7 +47,7 @@ import static org.junit.Assert.assertTrue;
  * A simple test that runs a streaming topology with checkpointing enabled.
  *
  * The test triggers a failure after a while and verifies that, after 
completion, the
- * state defined with either the {@link ValueState} or the {@link Checkpointed}
+ * state defined with either the {@link ValueState} or the {@link 
ListCheckpointed}
  * interface reflects the "exactly once" semantics.
  * 
  * The test throttles the input until at least two checkpoints are completed, 
to make sure that
@@ -139,15 +139,14 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
        // 
--------------------------------------------------------------------------------------------
        
        private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<String> 
-                       implements CheckpointedAsynchronously<Integer>
+                       implements ListCheckpointed<Integer>
        {
                private final long numElements;
 
                private int index;
 
                private volatile boolean isRunning = true;
-               
-               
+
                StringGeneratingSourceFunction(long numElements) {
                        this.numElements = numElements;
                }
@@ -197,23 +196,26 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return index;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.index);
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       index = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.index = state.get(0);
                }
        }
 
        private static class StringRichFilterFunction extends 
RichFilterFunction<String> 
-                       implements Checkpointed<Long> {
+                       implements ListCheckpointed<Long> {
 
                static final long[] counts = new long[PARALLELISM];
                
                private long count;
-               
+
                @Override
                public boolean filter(String value) throws Exception {
                        count++;
@@ -226,23 +228,26 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
        }
 
        private static class StringPrefixCountRichMapFunction extends 
RichMapFunction<String, PrefixCount> 
-                       implements CheckpointedAsynchronously<Long> {
+                       implements ListCheckpointed<Long> {
                
                static final long[] counts = new long[PARALLELISM];
 
                private long count;
-               
+
                @Override
                public PrefixCount map(String value) {
                        count++;
@@ -255,18 +260,21 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
        }
        
        private static class StatefulCounterFunction extends 
RichMapFunction<PrefixCount, PrefixCount> 
-               implements Checkpointed<Long> {
+               implements ListCheckpointed<Long> {
 
                static final long[] counts = new long[PARALLELISM];
                
@@ -284,18 +292,21 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
        }
        
        private static class OnceFailingAggregator extends 
RichFlatMapFunction<PrefixCount, PrefixCount> 
-               implements Checkpointed<HashMap<String, PrefixCount>>, 
CheckpointListener {
+               implements ListCheckpointed<HashMap<String, PrefixCount>>, 
CheckpointListener {
 
                static boolean wasCheckpointedBeforeFailure = false;
                
@@ -307,7 +318,6 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
                private long count;
                
                private boolean wasCheckpointed;
-               
 
                OnceFailingAggregator(long failurePos) {
                        this.failurePos = failurePos;
@@ -339,13 +349,16 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public HashMap<String, PrefixCount> snapshotState(long 
checkpointId, long checkpointTimestamp) {
-                       return aggregationMap;
+               public List<HashMap<String, PrefixCount>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
+                       return Collections.singletonList(this.aggregationMap);
                }
 
                @Override
-               public void restoreState(HashMap<String, PrefixCount> state) {
-                       aggregationMap.putAll(state);
+               public void restoreState(List<HashMap<String, PrefixCount>> 
state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.aggregationMap.putAll(state.get(0));
                }
 
                @Override
@@ -355,7 +368,7 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
        }
 
        private static class ValidatingSink extends 
RichSinkFunction<PrefixCount> 
-                       implements Checkpointed<HashMap<Character, Long>> {
+                       implements ListCheckpointed<HashMap<Character, Long>> {
 
                @SuppressWarnings("unchecked")
                private static Map<Character, Long>[] maps = (Map<Character, 
Long>[]) new Map<?, ?>[PARALLELISM];
@@ -379,13 +392,16 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public HashMap<Character, Long> snapshotState(long 
checkpointId, long checkpointTimestamp) {
-                       return counts;
+               public List<HashMap<Character, Long>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
+                       return Collections.singletonList(this.counts);
                }
 
                @Override
-               public void restoreState(HashMap<Character, Long> state) {
-                       counts.putAll(state);
+               public void restoreState(List<HashMap<Character, Long>> state) 
throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.counts.putAll(state.get(0));
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 6bf511f..be3fac5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -205,7 +206,7 @@ public class StreamCheckpointNotifierITCase extends 
TestLogger {
         * interface it stores all the checkpoint ids it has seen in a static 
list.
         */
        private static class GeneratingSourceFunction extends 
RichSourceFunction<Long>
-                       implements ParallelSourceFunction<Long>, 
CheckpointListener, Checkpointed<Integer> {
+                       implements ParallelSourceFunction<Long>, 
CheckpointListener, ListCheckpointed<Integer> {
                
                static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
                
@@ -263,13 +264,16 @@ public class StreamCheckpointNotifierITCase extends 
TestLogger {
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return index;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.index);
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       index = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.index = state.get(0);
                }
 
                @Override
@@ -390,7 +394,7 @@ public class StreamCheckpointNotifierITCase extends 
TestLogger {
         * Reducer that causes one failure between seeing 40% to 70% of the 
records.
         */
        private static class OnceFailingReducer extends 
RichReduceFunction<Tuple1<Long>> 
-               implements Checkpointed<Long>, CheckpointListener
+               implements ListCheckpointed<Long>, CheckpointListener
        {
                static volatile boolean hasFailed = false;
                static volatile long failureCheckpointID;
@@ -402,7 +406,7 @@ public class StreamCheckpointNotifierITCase extends 
TestLogger {
                private volatile long count;
 
                private volatile boolean notificationAlready;
-               
+
                OnceFailingReducer(long numElements) {
                        this.failurePos = (long) (0.5 * numElements / 
PARALLELISM);
                }
@@ -419,19 +423,22 @@ public class StreamCheckpointNotifierITCase extends 
TestLogger {
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
                        if (!hasFailed && count >= failurePos && 
getRuntimeContext().getIndexOfThisSubtask() == 0) {
                                LOG.info(">>>>>>>>>>>>>>>>> Throwing Exception 
<<<<<<<<<<<<<<<<<<<<<");
                                hasFailed = true;
                                failureCheckpointID = checkpointId;
                                throw new Exception("Test Failure");
                        }
-                       return count;
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index b97e1f2..aae04c9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -31,6 +31,8 @@ import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,7 +43,7 @@ import static org.junit.Assert.assertEquals;
  * A simple test that runs a streaming topology with checkpointing enabled.
  * 
  * The test triggers a failure after a while and verifies that, after 
completion, the
- * state defined with the {@link Checkpointed} interface reflects the "exactly 
once" semantics.
+ * state defined with the {@link ListCheckpointed} interface reflects the 
"exactly once" semantics.
  */
 @SuppressWarnings("serial")
 public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
@@ -118,7 +120,7 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
        // 
--------------------------------------------------------------------------------------------
        
        private static class StringGeneratingSourceFunction extends 
RichSourceFunction<String>
-                       implements ParallelSourceFunction<String>, 
Checkpointed<Integer> {
+                       implements ParallelSourceFunction<String>, 
ListCheckpointed<Integer> {
 
                private final long numElements;
                
@@ -131,7 +133,7 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
                private volatile boolean isRunning = true;
 
                static final long[] counts = new long[PARALLELISM];
-               
+
                @Override
                public void close() throws IOException {
                        counts[getRuntimeContext().getIndexOfThisSubtask()] = 
index;
@@ -186,17 +188,20 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return index;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.index);
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       index = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.index = state.get(0);
                }
        }
        
-       private static class StatefulCounterFunction extends 
RichMapFunction<PrefixCount, PrefixCount> implements Checkpointed<Long> {
+       private static class StatefulCounterFunction extends 
RichMapFunction<PrefixCount, PrefixCount> implements ListCheckpointed<Long> {
 
                private long count;
                static final long[] counts = new long[PARALLELISM];
@@ -213,13 +218,16 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
        }
 
@@ -227,7 +235,7 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
         * This function uses simultaneously the key/value state and is 
checkpointed.
         */
        private static class OnceFailingPrefixCounter extends 
RichMapFunction<PrefixCount, PrefixCount> 
-                       implements Checkpointed<Long> {
+                       implements ListCheckpointed<Long> {
                
                private static Map<String, Long> prefixCounts = new 
ConcurrentHashMap<String, Long>();
                static final long[] counts = new long[PARALLELISM];
@@ -238,7 +246,7 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
                
                private long failurePos;
                private long count;
-               
+
                private ValueState<Long> pCount;
                private long inputCount;
 
@@ -279,22 +287,25 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return inputCount;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.inputCount);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       inputCount = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.inputCount = state.get(0);
                }
        }
 
-       private static class StringRichFilterFunction extends 
RichFilterFunction<String> implements Checkpointed<Long> {
+       private static class StringRichFilterFunction extends 
RichFilterFunction<String> implements ListCheckpointed<Long> {
                
                static final long[] counts = new long[PARALLELISM];
 
                private long count;
-               
+
                @Override
                public boolean filter(String value) {
                        count++;
@@ -307,23 +318,26 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
        }
 
        private static class StringPrefixCountRichMapFunction extends 
RichMapFunction<String, PrefixCount>
-                       implements Checkpointed<Long> {
+                       implements ListCheckpointed<Long> {
                
                static final long[] counts = new long[PARALLELISM];
 
                private long count;
-               
+
                @Override
                public PrefixCount map(String value) throws IOException {
                        count++;
@@ -336,13 +350,16 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index 5874f56..dae5cd9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -146,7 +145,7 @@ public class UdfStreamOperatorCheckpointingITCase extends 
StreamFaultToleranceTe
         * augmented by the designated parallel subtaskId. The source is not 
parallel to ensure order.
         */
        private static class StatefulMultipleSequence extends 
RichSourceFunction<Tuple2<Integer, Long>>
-                       implements Checkpointed<Long> {
+                       implements ListCheckpointed<Long> {
 
                private long count;
 
@@ -168,13 +167,16 @@ public class UdfStreamOperatorCheckpointingITCase extends 
StreamFaultToleranceTe
                public void cancel() {}
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.count = state.get(0);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index e424a8d..a45349d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -46,7 +46,9 @@ import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
@@ -310,7 +312,7 @@ public class WindowCheckpointingITCase extends TestLogger {
        // 
------------------------------------------------------------------------
 
        private static class FailingSource extends 
RichSourceFunction<Tuple2<Long, IntType>>
-                       implements Checkpointed<Integer>, CheckpointListener
+                       implements ListCheckpointed<Integer>, CheckpointListener
        {
                private static volatile boolean failedBefore = false;
 
@@ -373,13 +375,16 @@ public class WindowCheckpointingITCase extends TestLogger 
{
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return numElementsEmitted;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return 
Collections.singletonList(this.numElementsEmitted);
                }
 
                @Override
-               public void restoreState(Integer state) {
-                       numElementsEmitted = state;
+               public void restoreState(List<Integer> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.numElementsEmitted = state.get(0);
                }
 
                public static void reset() {
@@ -388,7 +393,7 @@ public class WindowCheckpointingITCase extends TestLogger {
        }
 
        private static class ValidatingSink extends 
RichSinkFunction<Tuple2<Long, IntType>>
-                       implements Checkpointed<HashMap<Long, Integer>> {
+                       implements ListCheckpointed<HashMap<Long, Integer>> {
 
                private final HashMap<Long, Integer> counts = new HashMap<>();
 
@@ -439,18 +444,20 @@ public class WindowCheckpointingITCase extends TestLogger 
{
                }
 
                @Override
-               public HashMap<Long, Integer> snapshotState(long checkpointId, 
long checkpointTimestamp) {
-                       return this.counts;
+               public List<HashMap<Long, Integer>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
+                       return Collections.singletonList(this.counts);
                }
 
                @Override
-               public void restoreState(HashMap<Long, Integer> state) {
-                       this.counts.putAll(state);
+               public void restoreState(List<HashMap<Long, Integer>> state) 
throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.counts.putAll(state.get(0));
 
-                       for (Integer i : state.values()) {
+                       for (Integer i : state.get(0).values()) {
                                this.aggCount += i;
                        }
-
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index f91582f..52a3ba8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -21,13 +21,15 @@ package org.apache.flink.test.classloading.jar;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import java.lang.RuntimeException;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * A simple streaming program, which is using the state checkpointing of Flink.
@@ -57,7 +59,7 @@ public class CheckpointedStreamingProgram {
 
 
        // with Checkpoining
-       public static class SimpleStringGenerator implements 
SourceFunction<String>, Checkpointed<Integer> {
+       public static class SimpleStringGenerator implements 
SourceFunction<String>, ListCheckpointed<Integer> {
                public boolean running = true;
 
                @Override
@@ -74,32 +76,36 @@ public class CheckpointedStreamingProgram {
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return null;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.emptyList();
                }
 
                @Override
-               public void restoreState(Integer state) {
+               public void restoreState(List<Integer> state) throws Exception {
 
                }
        }
 
-       public static class StatefulMapper implements MapFunction<String, 
String>, Checkpointed<StatefulMapper>, CheckpointListener {
+       public static class StatefulMapper implements MapFunction<String, 
String>, ListCheckpointed<StatefulMapper>, CheckpointListener {
 
                private String someState;
                private boolean atLeastOneSnapshotComplete = false;
                private boolean restored = false;
 
                @Override
-               public StatefulMapper snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return this;
+               public List<StatefulMapper> snapshotState(long checkpointId, 
long timestamp) throws Exception {
+                       return Collections.singletonList(this);
                }
 
                @Override
-               public void restoreState(StatefulMapper state) {
+               public void restoreState(List<StatefulMapper> state) throws 
Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
                        restored = true;
-                       this.someState = state.someState;
-                       this.atLeastOneSnapshotComplete = 
state.atLeastOneSnapshotComplete;
+                       StatefulMapper s = state.get(0);
+                       this.someState = s.someState;
+                       this.atLeastOneSnapshotComplete = 
s.atLeastOneSnapshotComplete;
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index 6796cb0..d3baa7d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -34,7 +34,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -42,6 +42,8 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
 public class CheckpointingCustomKvStateProgram {
@@ -84,7 +86,7 @@ public class CheckpointingCustomKvStateProgram {
                env.execute();
        }
 
-       private static class InfiniteIntegerSource implements 
ParallelSourceFunction<Integer>, Checkpointed<Integer> {
+       private static class InfiniteIntegerSource implements 
ParallelSourceFunction<Integer>, ListCheckpointed<Integer> {
                private static final long serialVersionUID = 
-7517574288730066280L;
                private volatile boolean running = true;
 
@@ -104,17 +106,18 @@ public class CheckpointingCustomKvStateProgram {
                }
 
                @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return 0;
+               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(0);
                }
 
                @Override
-               public void restoreState(Integer state) throws Exception {
+               public void restoreState(List<Integer> state) throws Exception {
 
                }
        }
 
-       private static class ReducingStateFlatMap extends 
RichFlatMapFunction<Tuple2<Integer, Integer>, Integer> implements 
Checkpointed<ReducingStateFlatMap>, CheckpointListener {
+       private static class ReducingStateFlatMap extends 
RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>
+                       implements ListCheckpointed<ReducingStateFlatMap>, 
CheckpointListener {
 
                private static final long serialVersionUID = 
-5939722892793950253L;
                private transient ReducingState<Integer> kvState;
@@ -148,12 +151,12 @@ public class CheckpointingCustomKvStateProgram {
                }
 
                @Override
-               public ReducingStateFlatMap snapshotState(long checkpointId, 
long checkpointTimestamp) throws Exception {
-                       return this;
+               public List<ReducingStateFlatMap> snapshotState(long 
checkpointId, long timestamp) throws Exception {
+                       return Collections.singletonList(this);
                }
 
                @Override
-               public void restoreState(ReducingStateFlatMap state) throws 
Exception {
+               public void restoreState(List<ReducingStateFlatMap> state) 
throws Exception {
                        restored = true;
                        atLeastOneSnapshotComplete = true;
                }

Reply via email to