[FLINK-5969] Add OperatorSnapshotUtil This has methods for storing/reading OperatorStateHandles, as returned from stream operator test harnesses. This can be used to write binary snapshots for use in state migration tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2779197f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2779197f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2779197f Branch: refs/heads/master Commit: 2779197f237446e3bff4e9e15f90c24d721c8ab4 Parents: 9ed98f2 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Apr 24 12:31:53 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed May 3 16:24:26 2017 +0200 ---------------------------------------------------------------------- .../savepoint/SavepointV1Serializer.java | 27 ++-- .../streaming/util/OperatorSnapshotUtil.java | 156 +++++++++++++++++++ 2 files changed, 174 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2779197f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index aaa8cdd..f67d54c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint.savepoint; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskState; @@ -43,12 +45,13 @@ import java.util.Map; /** * Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x format) - * + * * <p>In contrast to the previous versions, this serializer makes sure that no Java - * serialization is used for serialization. Therefore, we don't rely on any involved + * serialization is used for serialization. Therefore, we don't rely on any involved * classes to stay the same. */ -class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { +@Internal +public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { private static final byte NULL_HANDLE = 0; private static final byte BYTE_STREAM_STATE_HANDLE = 1; @@ -210,7 +213,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { keyedStateStream); } - private static void serializeKeyedStateHandle( + @VisibleForTesting + public static void serializeKeyedStateHandle( KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle == null) { @@ -230,7 +234,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { } } - private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException { + @VisibleForTesting + public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException { final int type = dis.readByte(); if (NULL_HANDLE == type) { return null; @@ -251,7 +256,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { } } - private static void serializeOperatorStateHandle( + @VisibleForTesting + public static void serializeOperatorStateHandle( OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle != null) { @@ -279,7 +285,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { } } - private static OperatorStateHandle deserializeOperatorStateHandle( + @VisibleForTesting + public static OperatorStateHandle deserializeOperatorStateHandle( DataInputStream dis) throws IOException { final int type = dis.readByte(); @@ -310,7 +317,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { } } - private static void serializeStreamStateHandle( + @VisibleForTesting + public static void serializeStreamStateHandle( StreamStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle == null) { @@ -337,7 +345,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { dos.flush(); } - private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException { + @VisibleForTesting + public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException { int type = dis.read(); if (NULL_HANDLE == type) { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/2779197f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java new file mode 100644 index 0000000..92a9452 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; + +/** + * Util for writing/reading {@link org.apache.flink.streaming.runtime.tasks.OperatorStateHandles}, + * for use in tests. + */ +public class OperatorSnapshotUtil { + + public static String getResourceFilename(String filename) { + ClassLoader cl = OperatorSnapshotUtil.class.getClassLoader(); + URL resource = cl.getResource(filename); + return resource.getFile(); + } + + public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException { + FileOutputStream out = new FileOutputStream(path); + DataOutputStream dos = new DataOutputStream(out); + + dos.writeInt(state.getOperatorChainIndex()); + + SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); + + Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState(); + if (rawOperatorState != null) { + dos.writeInt(rawOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : rawOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); + } + + Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState(); + if (managedOperatorState != null) { + dos.writeInt(managedOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : managedOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); + } + + Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState(); + if (rawKeyedState != null) { + dos.writeInt(rawKeyedState.size()); + for (KeyedStateHandle keyedStateHandle : rawKeyedState) { + SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + } + } else { + // this means no operator states, not even an empty list + dos.writeInt(-1); + } + + Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState(); + if (managedKeyedState != null) { + dos.writeInt(managedKeyedState.size()); + for (KeyedStateHandle keyedStateHandle : managedKeyedState) { + SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + } + } else { + // this means no operator states, not even an empty list + dos.writeInt(-1); + } + + dos.flush(); + out.close(); + } + + public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException { + FileInputStream in = new FileInputStream(path); + DataInputStream dis = new DataInputStream(in); + int index = dis.readInt(); + + StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis); + + List<OperatorStateHandle> rawOperatorState = null; + int numRawOperatorStates = dis.readInt(); + if (numRawOperatorStates >= 0) { + rawOperatorState = new ArrayList<>(); + for (int i = 0; i < numRawOperatorStates; i++) { + OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( + dis); + rawOperatorState.add(operatorState); + } + } + + List<OperatorStateHandle> managedOperatorState = null; + int numManagedOperatorStates = dis.readInt(); + if (numManagedOperatorStates >= 0) { + managedOperatorState = new ArrayList<>(); + for (int i = 0; i < numManagedOperatorStates; i++) { + OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( + dis); + managedOperatorState.add(operatorState); + } + } + + List<KeyedStateHandle> rawKeyedState = null; + int numRawKeyedStates = dis.readInt(); + if (numRawKeyedStates >= 0) { + rawKeyedState = new ArrayList<>(); + for (int i = 0; i < numRawKeyedStates; i++) { + KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( + dis); + rawKeyedState.add(keyedState); + } + } + + List<KeyedStateHandle> managedKeyedState = null; + int numManagedKeyedStates = dis.readInt(); + if (numManagedKeyedStates >= 0) { + managedKeyedState = new ArrayList<>(); + for (int i = 0; i < numManagedKeyedStates; i++) { + KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( + dis); + managedKeyedState.add(keyedState); + } + } + + return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState); + } +}