[FLINK-5969] Add OperatorSnapshotUtil

This has methods for storing/reading OperatorStateHandles, as returned
from stream operator test harnesses. This can be used to write binary
snapshots for use in state migration tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/611434c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/611434c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/611434c6

Branch: refs/heads/release-1.2
Commit: 611434c6fa8e53cac25dd93f568d2670ec4ead72
Parents: 52fb578
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Apr 24 12:31:53 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed May 3 13:50:04 2017 +0200

----------------------------------------------------------------------
 .../savepoint/SavepointV1Serializer.java        |  23 ++-
 .../streaming/util/OperatorSnapshotUtil.java    | 156 +++++++++++++++++++
 2 files changed, 172 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/611434c6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index ba1949a..a9fa3c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
@@ -47,7 +49,8 @@ import java.util.Map;
  * that no default Java serialization is used for serialization. Therefore, we
  * don't rely on any involved Java classes to stay the same.
  */
-class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
+@Internal
+public class SavepointV1Serializer implements SavepointSerializer<SavepointV1> 
{
 
        private static final byte NULL_HANDLE = 0;
        private static final byte BYTE_STREAM_STATE_HANDLE = 1;
@@ -209,7 +212,8 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                                keyedStateStream);
        }
 
-       private static void serializeKeyGroupStateHandle(
+       @VisibleForTesting
+       public static void serializeKeyGroupStateHandle(
                        KeyGroupsStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
 
                if (stateHandle != null) {
@@ -225,7 +229,8 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                }
        }
 
-       private static KeyGroupsStateHandle 
deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
+       @VisibleForTesting
+       public static KeyGroupsStateHandle 
deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
                final int type = dis.readByte();
                if (NULL_HANDLE == type) {
                        return null;
@@ -245,7 +250,8 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                }
        }
 
-       private static void serializeOperatorStateHandle(
+       @VisibleForTesting
+       public static void serializeOperatorStateHandle(
                        OperatorStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
 
                if (stateHandle != null) {
@@ -273,7 +279,8 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                }
        }
 
-       private static OperatorStateHandle deserializeOperatorStateHandle(
+       @VisibleForTesting
+       public static OperatorStateHandle deserializeOperatorStateHandle(
                        DataInputStream dis) throws IOException {
 
                final int type = dis.readByte();
@@ -304,7 +311,8 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                }
        }
 
-       private static void serializeStreamStateHandle(
+       @VisibleForTesting
+       public static void serializeStreamStateHandle(
                        StreamStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
 
                if (stateHandle == null) {
@@ -331,7 +339,8 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                dos.flush();
        }
 
-       private static StreamStateHandle 
deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+       @VisibleForTesting
+       public static StreamStateHandle 
deserializeStreamStateHandle(DataInputStream dis) throws IOException {
                int type = dis.read();
                if (NULL_HANDLE == type) {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/611434c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
new file mode 100644
index 0000000..1a53598
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+
+/**
+ * Util for writing/reading {@link 
org.apache.flink.streaming.runtime.tasks.OperatorStateHandles},
+ * for use in tests.
+ */
+public class OperatorSnapshotUtil {
+
+       public static String getResourceFilename(String filename) {
+               ClassLoader cl = OperatorSnapshotUtil.class.getClassLoader();
+               URL resource = cl.getResource(filename);
+               return resource.getFile();
+       }
+
+       public static void writeStateHandle(OperatorStateHandles state, String 
path) throws IOException {
+               FileOutputStream out = new FileOutputStream(path);
+               DataOutputStream dos = new DataOutputStream(out);
+
+               dos.writeInt(state.getOperatorChainIndex());
+
+               
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
 dos);
+
+               Collection<OperatorStateHandle> rawOperatorState = 
state.getRawOperatorState();
+               if (rawOperatorState != null) {
+                       dos.writeInt(rawOperatorState.size());
+                       for (OperatorStateHandle operatorStateHandle : 
rawOperatorState) {
+                               
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+                       }
+               } else {
+                       // this means no states, not even an empty list
+                       dos.writeInt(-1);
+               }
+
+               Collection<OperatorStateHandle> managedOperatorState = 
state.getManagedOperatorState();
+               if (managedOperatorState != null) {
+                       dos.writeInt(managedOperatorState.size());
+                       for (OperatorStateHandle operatorStateHandle : 
managedOperatorState) {
+                               
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+                       }
+               } else {
+                       // this means no states, not even an empty list
+                       dos.writeInt(-1);
+               }
+
+               Collection<KeyGroupsStateHandle> rawKeyedState = 
state.getRawKeyedState();
+               if (rawKeyedState != null) {
+                       dos.writeInt(rawKeyedState.size());
+                       for (KeyGroupsStateHandle keyedStateHandle : 
rawKeyedState) {
+                               
SavepointV1Serializer.serializeKeyGroupStateHandle(keyedStateHandle, dos);
+                       }
+               } else {
+                       // this means no operator states, not even an empty list
+                       dos.writeInt(-1);
+               }
+
+               Collection<KeyGroupsStateHandle> managedKeyedState = 
state.getManagedKeyedState();
+               if (managedKeyedState != null) {
+                       dos.writeInt(managedKeyedState.size());
+                       for (KeyGroupsStateHandle keyedStateHandle : 
managedKeyedState) {
+                               
SavepointV1Serializer.serializeKeyGroupStateHandle(keyedStateHandle, dos);
+                       }
+               } else {
+                       // this means no operator states, not even an empty list
+                       dos.writeInt(-1);
+               }
+
+               dos.flush();
+               out.close();
+       }
+
+       public static OperatorStateHandles readStateHandle(String path) throws 
IOException, ClassNotFoundException {
+               FileInputStream in = new FileInputStream(path);
+               DataInputStream dis = new DataInputStream(in);
+               int index = dis.readInt();
+
+               StreamStateHandle legacyState = 
SavepointV1Serializer.deserializeStreamStateHandle(dis);
+
+               List<OperatorStateHandle> rawOperatorState = null;
+               int numRawOperatorStates = dis.readInt();
+               if (numRawOperatorStates >= 0) {
+                       rawOperatorState = new ArrayList<>();
+                       for (int i = 0; i < numRawOperatorStates; i++) {
+                               OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
+                                               dis);
+                               rawOperatorState.add(operatorState);
+                       }
+               }
+
+               List<OperatorStateHandle> managedOperatorState = null;
+               int numManagedOperatorStates = dis.readInt();
+               if (numManagedOperatorStates >= 0) {
+                       managedOperatorState = new ArrayList<>();
+                       for (int i = 0; i < numManagedOperatorStates; i++) {
+                               OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
+                                               dis);
+                               managedOperatorState.add(operatorState);
+                       }
+               }
+
+               List<KeyGroupsStateHandle> rawKeyedState = null;
+               int numRawKeyedStates = dis.readInt();
+               if (numRawKeyedStates >= 0) {
+                       rawKeyedState = new ArrayList<>();
+                       for (int i = 0; i < numRawKeyedStates; i++) {
+                               KeyGroupsStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyGroupStateHandle(
+                                               dis);
+                               rawKeyedState.add(keyedState);
+                       }
+               }
+
+               List<KeyGroupsStateHandle> managedKeyedState = null;
+               int numManagedKeyedStates = dis.readInt();
+               if (numManagedKeyedStates >= 0) {
+                       managedKeyedState = new ArrayList<>();
+                       for (int i = 0; i < numManagedKeyedStates; i++) {
+                               KeyGroupsStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyGroupStateHandle(
+                                               dis);
+                               managedKeyedState.add(keyedState);
+                       }
+               }
+
+               return new OperatorStateHandles(index, legacyState, 
managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+       }
+}

Reply via email to