[FLINK-5969] Also snapshot legacy state in operator test harness

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c89d4b43
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c89d4b43
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c89d4b43

Branch: refs/heads/release-1.2
Commit: c89d4b43213d7d3e2ad184787061fe3178be5691
Parents: c43fc2a
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Apr 24 17:13:49 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed May 3 13:50:04 2017 +0200

----------------------------------------------------------------------
 .../util/AbstractStreamOperatorTestHarness.java   | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c89d4b43/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 2df4efd..baced53 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -471,9 +471,25 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                OperatorStateHandle opManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
                OperatorStateHandle opRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
 
+               // also snapshot legacy state, if any
+               StreamStateHandle legacyStateHandle = null;
+
+               if (operator instanceof StreamCheckpointedOperator) {
+
+                       CheckpointStreamFactory streamFactory = 
stateBackend.createStreamFactory(
+                                       new JobID(),
+                                       "test_op");
+
+                       final 
CheckpointStreamFactory.CheckpointStateOutputStream outStream =
+                                       
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+                               ((StreamCheckpointedOperator) 
operator).snapshotState(outStream, checkpointId, timestamp);
+                               legacyStateHandle = 
outStream.closeAndGetHandle();
+               }
+
                return new OperatorStateHandles(
                        0,
-                       null,
+                       legacyStateHandle,
                        keyedManaged != null ? 
Collections.singletonList(keyedManaged) : null,
                        keyedRaw != null ? Collections.singletonList(keyedRaw) 
: null,
                        opManaged != null ? 
Collections.singletonList(opManaged) : null,

Reply via email to