[FLINK-5969] Also snapshot legacy state in operator test harness
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c89d4b43 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c89d4b43 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c89d4b43 Branch: refs/heads/release-1.2 Commit: c89d4b43213d7d3e2ad184787061fe3178be5691 Parents: c43fc2a Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Apr 24 17:13:49 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed May 3 13:50:04 2017 +0200 ---------------------------------------------------------------------- .../util/AbstractStreamOperatorTestHarness.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c89d4b43/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 2df4efd..baced53 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -471,9 +471,25 @@ public class AbstractStreamOperatorTestHarness<OUT> { OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture()); OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture()); + // also snapshot legacy state, if any + StreamStateHandle legacyStateHandle = null; + + if (operator instanceof StreamCheckpointedOperator) { + + CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory( + new JobID(), + "test_op"); + + final CheckpointStreamFactory.CheckpointStateOutputStream outStream = + streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + + ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); + legacyStateHandle = outStream.closeAndGetHandle(); + } + return new OperatorStateHandles( 0, - null, + legacyStateHandle, keyedManaged != null ? Collections.singletonList(keyedManaged) : null, keyedRaw != null ? Collections.singletonList(keyedRaw) : null, opManaged != null ? Collections.singletonList(opManaged) : null,