[FLINK-5113] Port functions in tests to new CheckpointedFunction IF. This closes #2939.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/525edf1e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/525edf1e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/525edf1e Branch: refs/heads/master Commit: 525edf1e6925b55302d991ddf537a2f16caba21d Parents: 570dbc8 Author: kl0u <kklou...@gmail.com> Authored: Tue Nov 22 19:23:33 2016 +0100 Committer: zentol <ches...@apache.org> Committed: Thu Jan 19 23:57:22 2017 +0100 ---------------------------------------------------------------------- .../CassandraTupleWriteAheadSinkExample.java | 22 ++- .../fs/RollingSinkFaultToleranceITCase.java | 18 ++- .../BucketingSinkFaultToleranceITCase.java | 18 ++- .../connectors/kafka/KafkaConsumerTestBase.java | 22 ++- .../kafka/testutils/FailingIdentityMapper.java | 18 ++- .../testutils/ValidatingExactlyOnceSink.java | 26 ++-- .../org/apache/flink/util/CollectionUtil.java | 2 +- .../streaming/api/checkpoint/Checkpointed.java | 3 +- .../runtime/tasks/SourceStreamTaskTest.java | 12 +- ...tractEventTimeWindowCheckpointingITCase.java | 43 ++++-- .../CoStreamCheckpointingITCase.java | 74 ++++++---- .../EventTimeAllWindowCheckpointingITCase.java | 38 +++-- .../PartitionedStateCheckpointingITCase.java | 17 ++- .../test/checkpointing/RescalingITCase.java | 27 ++-- .../test/checkpointing/SavepointITCase.java | 14 +- .../checkpointing/StateCheckpointedITCase.java | 94 +++++++----- .../StreamCheckpointNotifierITCase.java | 31 ++-- .../StreamCheckpointingITCase.java | 79 ++++++---- .../UdfStreamOperatorCheckpointingITCase.java | 14 +- .../WindowCheckpointingITCase.java | 33 +++-- .../jar/CheckpointedStreamingProgram.java | 28 ++-- .../jar/CheckpointingCustomKvStateProgram.java | 21 +-- .../jar/LegacyCheckpointedStreamingProgram.java | 148 +++++++++++++++++++ .../flink/test/recovery/ChaosMonkeyITCase.java | 38 +++-- .../JobManagerHACheckpointRecoveryITCase.java | 41 +++-- ...erProcessFailureStreamingRecoveryITCase.java | 30 ++-- 26 files changed, 608 insertions(+), 303 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java index 811c410..23de949 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java @@ -21,12 +21,14 @@ import com.datastax.driver.core.Cluster; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.cassandra.CassandraSink; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import java.util.Collections; +import java.util.List; import java.util.UUID; /** @@ -50,6 +52,9 @@ public class CassandraTupleWriteAheadSinkExample { .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") .enableWriteAheadLog() .setClusterBuilder(new ClusterBuilder() { + + private static final long serialVersionUID = 2793938419775311824L; + @Override public Cluster buildCluster(Cluster.Builder builder) { return builder.addContactPoint("127.0.0.1").build(); @@ -62,7 +67,9 @@ public class CassandraTupleWriteAheadSinkExample { env.execute(); } - public static class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> { + public static class MySource implements SourceFunction<Tuple2<String, Integer>>, ListCheckpointed<Integer> { + private static final long serialVersionUID = 4022367939215095610L; + private int counter = 0; private boolean stop = false; @@ -84,13 +91,16 @@ public class CassandraTupleWriteAheadSinkExample { } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return counter; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.counter); } @Override - public void restoreState(Integer state) throws Exception { - this.counter = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.counter = state.get(0); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java index 36c0d03..2d8492f 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.fs; import com.google.common.collect.Sets; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -42,7 +42,9 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Random; import java.util.Set; import java.util.regex.Matcher; @@ -236,7 +238,7 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas } private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> - implements CheckpointedAsynchronously<Integer> { + implements ListCheckpointed<Integer> { private static final long serialVersionUID = 1L; @@ -246,7 +248,6 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas private volatile boolean isRunning = true; - StringGeneratingSourceFunction(long numElements) { this.numElements = numElements; } @@ -288,13 +289,16 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.index); } @Override - public void restoreState(Integer state) { - index = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.index = state.get(0); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java index 54703a3..85f23b6 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.fs.bucketing; import com.google.common.collect.Sets; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -42,7 +42,9 @@ import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.io.BufferedReader; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Random; import java.util.Set; import java.util.regex.Matcher; @@ -233,7 +235,7 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB } private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> - implements CheckpointedAsynchronously<Integer> { + implements ListCheckpointed<Integer> { private static final long serialVersionUID = 1L; @@ -243,7 +245,6 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB private volatile boolean isRunning = true; - StringGeneratingSourceFunction(long numElements) { this.numElements = numElements; } @@ -285,13 +286,16 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(index); } @Override - public void restoreState(Integer state) { - index = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.index = state.get(0); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index aa7ea49..d7fab88 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -51,21 +51,16 @@ import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; @@ -73,7 +68,6 @@ import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidating import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; @@ -1925,7 +1919,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { public static class BrokerKillingMapper<T> extends RichMapFunction<T,T> - implements Checkpointed<Integer>, CheckpointListener { + implements ListCheckpointed<Integer>, CheckpointListener { private static final long serialVersionUID = 6334389850158707313L; @@ -1939,7 +1933,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { private boolean failer; private boolean hasBeenCheckpointed; - public BrokerKillingMapper(int shutdownBrokerId, int failCount) { this.shutdownBrokerId = shutdownBrokerId; this.failCount = failCount; @@ -1994,13 +1987,16 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsTotal; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.numElementsTotal); } @Override - public void restoreState(Integer state) { - this.numElementsTotal = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.numElementsTotal = state.get(0); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java index 2bd400c..ec64b00 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java @@ -21,13 +21,16 @@ package org.apache.flink.streaming.connectors.kafka.testutils; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.List; + public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements - Checkpointed<Integer>, CheckpointListener, Runnable { + ListCheckpointed<Integer>, CheckpointListener, Runnable { private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class); @@ -89,13 +92,16 @@ public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsTotal; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(numElementsTotal); } @Override - public void restoreState(Integer state) { - numElementsTotal = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.numElementsTotal = state.get(0); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java index 7813561..46e70fd 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java @@ -19,15 +19,17 @@ package org.apache.flink.streaming.connectors.kafka.testutils; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.test.util.SuccessException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.BitSet; +import java.util.Collections; +import java.util.List; -public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements Checkpointed<Tuple2<Integer, BitSet>> { +public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements ListCheckpointed<Tuple2<Integer, BitSet>> { private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class); @@ -39,7 +41,6 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> impleme private int numElements; // this is checkpointed - public ValidatingExactlyOnceSink(int numElementsTotal) { this.numElementsTotal = numElementsTotal; } @@ -68,15 +69,20 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> impleme } @Override - public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) { - LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId); - return new Tuple2<>(numElements, duplicateChecker); + public List<Tuple2<Integer, BitSet>> snapshotState(long checkpointId, long timestamp) throws Exception { + LOG.info("Snapshot of counter " + numElements + " at checkpoint " + checkpointId); + return Collections.singletonList(new Tuple2<>(numElements, duplicateChecker)); } @Override - public void restoreState(Tuple2<Integer, BitSet> state) { - LOG.info("restoring num elements to {}", state.f0); - this.numElements = state.f0; - this.duplicateChecker = state.f1; + public void restoreState(List<Tuple2<Integer, BitSet>> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + + Tuple2<Integer, BitSet> s = state.get(0); + LOG.info("restoring num elements to {}", s.f0); + this.numElements = s.f0; + this.duplicateChecker = s.f1; } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java index 15d00ae..cd5c2e5 100644 --- a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java @@ -34,4 +34,4 @@ public final class CollectionUtil { public static boolean isNullOrEmpty(Map<?, ?> map) { return map == null || map.isEmpty(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java index 7af5cea..fb67ea7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java @@ -31,8 +31,7 @@ import java.io.Serializable; * state is written, the function is not called, so the function needs not return a * copy of its state, but may return a reference to its state. Functions that can * continue to work and mutate the state, even while the state snapshot is being accessed, - * can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously} - * interface.</p> + * can implement the {@link CheckpointedAsynchronously} interface.</p> * * @param <T> The type of the operator state. */ http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index b592fe8..dd1fe58 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -23,7 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -34,6 +34,7 @@ import org.junit.Assert; import org.junit.Test; import java.io.Serializable; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; @@ -135,7 +136,7 @@ public class SourceStreamTaskTest { } } - private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> { + private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> { private static final long serialVersionUID = 1; private int maxElements; @@ -181,7 +182,7 @@ public class SourceStreamTaskTest { } @Override - public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public List<Serializable> snapshotState(long checkpointId, long timestamp) throws Exception { if (!semaphore.tryAcquire()) { Assert.fail("Concurrent invocation of snapshotState."); } @@ -199,11 +200,12 @@ public class SourceStreamTaskTest { Assert.fail("Count is different at start end end of snapshot."); } semaphore.release(); - return sum; + return Collections.<Serializable>singletonList(sum); } @Override - public void restoreState(Serializable state) {} + public void restoreState(List<Serializable> state) throws Exception { + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 583e42f..1911f44 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; @@ -53,7 +53,9 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.flink.test.util.TestUtils.tryExecute; @@ -495,7 +497,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog // ------------------------------------------------------------------------ private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> - implements Checkpointed<Integer>, CheckpointListener + implements ListCheckpointed<Integer>, CheckpointListener { private static volatile boolean failedBefore = false; @@ -567,13 +569,16 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsEmitted; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.numElementsEmitted); } @Override - public void restoreState(Integer state) { - numElementsEmitted = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.numElementsEmitted = state.get(0); } public static void reset() { @@ -582,7 +587,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> - implements Checkpointed<HashMap<Long, Integer>> { + implements ListCheckpointed<HashMap<Long, Integer>> { private final HashMap<Long, Integer> windowCounts = new HashMap<>(); @@ -676,19 +681,22 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } @Override - public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) { - return this.windowCounts; + public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.windowCounts); } @Override - public void restoreState(HashMap<Long, Integer> state) { - this.windowCounts.putAll(state); + public void restoreState(List<HashMap<Long, Integer>> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + windowCounts.putAll(state.get(0)); } } // Sink for validating the stateful window counts private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> - implements Checkpointed<HashMap<Long, Integer>> { + implements ListCheckpointed<HashMap<Long, Integer>> { private final HashMap<Long, Integer> windowCounts = new HashMap<>(); @@ -757,13 +765,16 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } @Override - public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) { - return this.windowCounts; + public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.windowCounts); } @Override - public void restoreState(HashMap<Long, Integer> state) { - this.windowCounts.putAll(state); + public void restoreState(List<HashMap<Long, Integer>> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.windowCounts.putAll(state.get(0)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java index 51a00b9..06d3ab0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; @@ -38,6 +38,8 @@ import org.junit.Test; import java.io.IOException; import java.io.Serializable; +import java.util.Collections; +import java.util.List; import java.util.Random; import static org.junit.Assert.assertEquals; @@ -142,11 +144,11 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa /** * A generating source that is slow before the first two checkpoints went through * and will indefinitely stall at a certain point to allow the checkpoint to complete. - * + * * After the checkpoints are through, it continues with full speed. */ private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> - implements Checkpointed<Integer>, CheckpointListener { + implements ListCheckpointed<Integer>, CheckpointListener { private static volatile int numCompletedCheckpoints = 0; @@ -210,13 +212,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.index); } @Override - public void restoreState(Integer state) { - index = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.index = state.get(0); } @Override @@ -239,12 +244,11 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa } private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> - implements Checkpointed<Long> { + implements ListCheckpointed<Long> { static final long[] counts = new long[PARALLELISM]; private long count; - @Override public PrefixCount map(PrefixCount value) throws Exception { @@ -258,13 +262,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } } @@ -303,7 +310,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa } } - private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> { + private static class StringRichFilterFunction extends RichFilterFunction<String> implements ListCheckpointed<Long> { static final long[] counts = new long[PARALLELISM]; @@ -321,18 +328,21 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } } - private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> { - + private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements ListCheckpointed<Long> { + static final long[] counts = new long[PARALLELISM]; private long count; @@ -344,13 +354,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } @Override @@ -359,7 +372,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa } } - private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> { + private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements ListCheckpointed<Long> { static final long[] counts = new long[PARALLELISM]; @@ -378,13 +391,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index b493e42..09c1437 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; @@ -43,14 +43,16 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.*; /** - * This verfies that checkpointing works correctly with event time windows. + * This verifies that checkpointing works correctly with event time windows. * * <p> * This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows. @@ -432,7 +434,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { // ------------------------------------------------------------------------ private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> - implements Checkpointed<Integer>, CheckpointListener + implements ListCheckpointed<Integer>, CheckpointListener { private static volatile boolean failedBefore = false; @@ -502,23 +504,26 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { numSuccessfulCheckpoints++; } - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsEmitted; + public static void reset() { + failedBefore = false; } @Override - public void restoreState(Integer state) { - numElementsEmitted = state; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.numElementsEmitted); } - public static void reset() { - failedBefore = false; + @Override + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.numElementsEmitted = state.get(0); } } private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> - implements Checkpointed<HashMap<Long, Integer>> { + implements ListCheckpointed<HashMap<Long, Integer>> { private final HashMap<Long, Integer> windowCounts = new HashMap<>(); @@ -612,13 +617,16 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { } @Override - public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) { - return this.windowCounts; + public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.windowCounts); } @Override - public void restoreState(HashMap<Long, Integer> state) { - this.windowCounts.putAll(state); + public void restoreState(List<HashMap<Long, Integer>> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.windowCounts.putAll(state.get(0)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java index 0728b41..4761d70 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; @@ -33,7 +35,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -86,7 +88,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes // -------------------------------------------------------------------------------------------- private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer> - implements Checkpointed<Integer> { + implements ListCheckpointed<Integer> { private final long numElements; @@ -133,13 +135,16 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.index); } @Override - public void restoreState(Integer state) { - index = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.index = state.get(0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index bd1678e..8045d82 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -41,7 +41,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; @@ -66,6 +65,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -763,7 +763,7 @@ public class RescalingITCase extends TestLogger { } } - private static class SubtaskIndexNonPartitionedStateSource extends SubtaskIndexSource implements Checkpointed<Integer> { + private static class SubtaskIndexNonPartitionedStateSource extends SubtaskIndexSource implements ListCheckpointed<Integer> { private static final long serialVersionUID = 8388073059042040203L; @@ -772,13 +772,16 @@ public class RescalingITCase extends TestLogger { } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return counter; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.counter); } @Override - public void restoreState(Integer state) throws Exception { - counter = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.counter = state.get(0); } } @@ -879,18 +882,20 @@ public class RescalingITCase extends TestLogger { } } - private static class NonPartitionedStateSource extends StateSourceBase implements Checkpointed<Integer> { + private static class NonPartitionedStateSource extends StateSourceBase implements ListCheckpointed<Integer> { private static final long serialVersionUID = -8108185918123186841L; @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return counter; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.counter); } @Override - public void restoreState(Integer state) throws Exception { - counter = state; + public void restoreState(List<Integer> state) throws Exception { + if (!state.isEmpty()) { + this.counter = state.get(0); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 9f957e5..77777d1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -63,7 +63,6 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseS import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeStream; @@ -527,7 +526,7 @@ public class SavepointITCase extends TestLogger { private static class StatefulCounter extends RichMapFunction<Integer, Integer> - implements Checkpointed<byte[]>, CheckpointListener { + implements ListCheckpointed<byte[]>, CheckpointListener { private static final Object checkpointLock = new Object(); private static int numCompleteCalls; @@ -556,13 +555,16 @@ public class SavepointITCase extends TestLogger { } @Override - public byte[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return data; + public List<byte[]> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(data); } @Override - public void restoreState(byte[] data) throws Exception { - this.data = data; + public void restoreState(List<byte[]> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.data = state.get(0); synchronized (checkpointLock) { if (++numRestoreCalls == getRuntimeContext().getNumberOfParallelSubtasks()) { http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java index 9d37b59..32d9e23 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java @@ -24,8 +24,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -34,9 +33,10 @@ import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; @@ -47,7 +47,7 @@ import static org.junit.Assert.assertTrue; * A simple test that runs a streaming topology with checkpointing enabled. * * The test triggers a failure after a while and verifies that, after completion, the - * state defined with either the {@link ValueState} or the {@link Checkpointed} + * state defined with either the {@link ValueState} or the {@link ListCheckpointed} * interface reflects the "exactly once" semantics. * * The test throttles the input until at least two checkpoints are completed, to make sure that @@ -139,15 +139,14 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { // -------------------------------------------------------------------------------------------- private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> - implements CheckpointedAsynchronously<Integer> + implements ListCheckpointed<Integer> { private final long numElements; private int index; private volatile boolean isRunning = true; - - + StringGeneratingSourceFunction(long numElements) { this.numElements = numElements; } @@ -197,23 +196,26 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.index); } @Override - public void restoreState(Integer state) { - index = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.index = state.get(0); } } private static class StringRichFilterFunction extends RichFilterFunction<String> - implements Checkpointed<Long> { + implements ListCheckpointed<Long> { static final long[] counts = new long[PARALLELISM]; private long count; - + @Override public boolean filter(String value) throws Exception { count++; @@ -226,23 +228,26 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } } private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> - implements CheckpointedAsynchronously<Long> { + implements ListCheckpointed<Long> { static final long[] counts = new long[PARALLELISM]; private long count; - + @Override public PrefixCount map(String value) { count++; @@ -255,18 +260,21 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } } private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> - implements Checkpointed<Long> { + implements ListCheckpointed<Long> { static final long[] counts = new long[PARALLELISM]; @@ -284,18 +292,21 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } } private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> - implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointListener { + implements ListCheckpointed<HashMap<String, PrefixCount>>, CheckpointListener { static boolean wasCheckpointedBeforeFailure = false; @@ -307,7 +318,6 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { private long count; private boolean wasCheckpointed; - OnceFailingAggregator(long failurePos) { this.failurePos = failurePos; @@ -339,13 +349,16 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { } @Override - public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) { - return aggregationMap; + public List<HashMap<String, PrefixCount>> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.aggregationMap); } @Override - public void restoreState(HashMap<String, PrefixCount> state) { - aggregationMap.putAll(state); + public void restoreState(List<HashMap<String, PrefixCount>> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.aggregationMap.putAll(state.get(0)); } @Override @@ -355,7 +368,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { } private static class ValidatingSink extends RichSinkFunction<PrefixCount> - implements Checkpointed<HashMap<Character, Long>> { + implements ListCheckpointed<HashMap<Character, Long>> { @SuppressWarnings("unchecked") private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM]; @@ -379,13 +392,16 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { } @Override - public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) { - return counts; + public List<HashMap<Character, Long>> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.counts); } @Override - public void restoreState(HashMap<Character, Long> state) { - counts.putAll(state); + public void restoreState(List<HashMap<Character, Long>> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.counts.putAll(state.get(0)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java index 6bf511f..be3fac5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java @@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -205,7 +206,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger { * interface it stores all the checkpoint ids it has seen in a static list. */ private static class GeneratingSourceFunction extends RichSourceFunction<Long> - implements ParallelSourceFunction<Long>, CheckpointListener, Checkpointed<Integer> { + implements ParallelSourceFunction<Long>, CheckpointListener, ListCheckpointed<Integer> { static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); @@ -263,13 +264,16 @@ public class StreamCheckpointNotifierITCase extends TestLogger { } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.index); } @Override - public void restoreState(Integer state) { - index = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.index = state.get(0); } @Override @@ -390,7 +394,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger { * Reducer that causes one failure between seeing 40% to 70% of the records. */ private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>> - implements Checkpointed<Long>, CheckpointListener + implements ListCheckpointed<Long>, CheckpointListener { static volatile boolean hasFailed = false; static volatile long failureCheckpointID; @@ -402,7 +406,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger { private volatile long count; private volatile boolean notificationAlready; - + OnceFailingReducer(long numElements) { this.failurePos = (long) (0.5 * numElements / PARALLELISM); } @@ -419,19 +423,22 @@ public class StreamCheckpointNotifierITCase extends TestLogger { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) { LOG.info(">>>>>>>>>>>>>>>>> Throwing Exception <<<<<<<<<<<<<<<<<<<<<"); hasFailed = true; failureCheckpointID = checkpointId; throw new Exception("Test Failure"); } - return count; + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index b97e1f2..aae04c9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; @@ -41,7 +43,7 @@ import static org.junit.Assert.assertEquals; * A simple test that runs a streaming topology with checkpointing enabled. * * The test triggers a failure after a while and verifies that, after completion, the - * state defined with the {@link Checkpointed} interface reflects the "exactly once" semantics. + * state defined with the {@link ListCheckpointed} interface reflects the "exactly once" semantics. */ @SuppressWarnings("serial") public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { @@ -118,7 +120,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { // -------------------------------------------------------------------------------------------- private static class StringGeneratingSourceFunction extends RichSourceFunction<String> - implements ParallelSourceFunction<String>, Checkpointed<Integer> { + implements ParallelSourceFunction<String>, ListCheckpointed<Integer> { private final long numElements; @@ -131,7 +133,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { private volatile boolean isRunning = true; static final long[] counts = new long[PARALLELISM]; - + @Override public void close() throws IOException { counts[getRuntimeContext().getIndexOfThisSubtask()] = index; @@ -186,17 +188,20 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.index); } @Override - public void restoreState(Integer state) { - index = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.index = state.get(0); } } - private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements Checkpointed<Long> { + private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements ListCheckpointed<Long> { private long count; static final long[] counts = new long[PARALLELISM]; @@ -213,13 +218,16 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } } @@ -227,7 +235,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { * This function uses simultaneously the key/value state and is checkpointed. */ private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount> - implements Checkpointed<Long> { + implements ListCheckpointed<Long> { private static Map<String, Long> prefixCounts = new ConcurrentHashMap<String, Long>(); static final long[] counts = new long[PARALLELISM]; @@ -238,7 +246,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { private long failurePos; private long count; - + private ValueState<Long> pCount; private long inputCount; @@ -279,22 +287,25 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return inputCount; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.inputCount); } @Override - public void restoreState(Long state) { - inputCount = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.inputCount = state.get(0); } } - private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> { + private static class StringRichFilterFunction extends RichFilterFunction<String> implements ListCheckpointed<Long> { static final long[] counts = new long[PARALLELISM]; private long count; - + @Override public boolean filter(String value) { count++; @@ -307,23 +318,26 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } } private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> - implements Checkpointed<Long> { + implements ListCheckpointed<Long> { static final long[] counts = new long[PARALLELISM]; private long count; - + @Override public PrefixCount map(String value) throws IOException { count++; @@ -336,13 +350,16 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java index 5874f56..dae5cd9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -146,7 +145,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe * augmented by the designated parallel subtaskId. The source is not parallel to ensure order. */ private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>> - implements Checkpointed<Long> { + implements ListCheckpointed<Long> { private long count; @@ -168,13 +167,16 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe public void cancel() {} @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.count = state.get(0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index e424a8d..a45349d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; @@ -46,7 +46,9 @@ import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -310,7 +312,7 @@ public class WindowCheckpointingITCase extends TestLogger { // ------------------------------------------------------------------------ private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> - implements Checkpointed<Integer>, CheckpointListener + implements ListCheckpointed<Integer>, CheckpointListener { private static volatile boolean failedBefore = false; @@ -373,13 +375,16 @@ public class WindowCheckpointingITCase extends TestLogger { } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsEmitted; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.numElementsEmitted); } @Override - public void restoreState(Integer state) { - numElementsEmitted = state; + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.numElementsEmitted = state.get(0); } public static void reset() { @@ -388,7 +393,7 @@ public class WindowCheckpointingITCase extends TestLogger { } private static class ValidatingSink extends RichSinkFunction<Tuple2<Long, IntType>> - implements Checkpointed<HashMap<Long, Integer>> { + implements ListCheckpointed<HashMap<Long, Integer>> { private final HashMap<Long, Integer> counts = new HashMap<>(); @@ -439,18 +444,20 @@ public class WindowCheckpointingITCase extends TestLogger { } @Override - public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) { - return this.counts; + public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.counts); } @Override - public void restoreState(HashMap<Long, Integer> state) { - this.counts.putAll(state); + public void restoreState(List<HashMap<Long, Integer>> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.counts.putAll(state.get(0)); - for (Integer i : state.values()) { + for (Integer i : state.get(0).values()) { this.aggCount += i; } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java index f91582f..52a3ba8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java @@ -21,13 +21,15 @@ package org.apache.flink.test.classloading.jar; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.lang.RuntimeException; +import java.util.Collections; +import java.util.List; /** * A simple streaming program, which is using the state checkpointing of Flink. @@ -57,7 +59,7 @@ public class CheckpointedStreamingProgram { // with Checkpoining - public static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> { + public static class SimpleStringGenerator implements SourceFunction<String>, ListCheckpointed<Integer> { public boolean running = true; @Override @@ -74,32 +76,36 @@ public class CheckpointedStreamingProgram { } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return null; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.emptyList(); } @Override - public void restoreState(Integer state) { + public void restoreState(List<Integer> state) throws Exception { } } - public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener { + public static class StatefulMapper implements MapFunction<String, String>, ListCheckpointed<StatefulMapper>, CheckpointListener { private String someState; private boolean atLeastOneSnapshotComplete = false; private boolean restored = false; @Override - public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return this; + public List<StatefulMapper> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this); } @Override - public void restoreState(StatefulMapper state) { + public void restoreState(List<StatefulMapper> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } restored = true; - this.someState = state.someState; - this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete; + StatefulMapper s = state.get(0); + this.someState = s.someState; + this.atLeastOneSnapshotComplete = s.atLeastOneSnapshotComplete; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java index 6796cb0..d3baa7d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java @@ -34,7 +34,7 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; @@ -42,6 +42,8 @@ import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; public class CheckpointingCustomKvStateProgram { @@ -84,7 +86,7 @@ public class CheckpointingCustomKvStateProgram { env.execute(); } - private static class InfiniteIntegerSource implements ParallelSourceFunction<Integer>, Checkpointed<Integer> { + private static class InfiniteIntegerSource implements ParallelSourceFunction<Integer>, ListCheckpointed<Integer> { private static final long serialVersionUID = -7517574288730066280L; private volatile boolean running = true; @@ -104,17 +106,18 @@ public class CheckpointingCustomKvStateProgram { } @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return 0; + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(0); } @Override - public void restoreState(Integer state) throws Exception { + public void restoreState(List<Integer> state) throws Exception { } } - private static class ReducingStateFlatMap extends RichFlatMapFunction<Tuple2<Integer, Integer>, Integer> implements Checkpointed<ReducingStateFlatMap>, CheckpointListener { + private static class ReducingStateFlatMap extends RichFlatMapFunction<Tuple2<Integer, Integer>, Integer> + implements ListCheckpointed<ReducingStateFlatMap>, CheckpointListener { private static final long serialVersionUID = -5939722892793950253L; private transient ReducingState<Integer> kvState; @@ -148,12 +151,12 @@ public class CheckpointingCustomKvStateProgram { } @Override - public ReducingStateFlatMap snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return this; + public List<ReducingStateFlatMap> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this); } @Override - public void restoreState(ReducingStateFlatMap state) throws Exception { + public void restoreState(List<ReducingStateFlatMap> state) throws Exception { restored = true; atLeastOneSnapshotComplete = true; }