[FLINK-5969] Add OperatorSnapshotUtil This has methods for storing/reading OperatorStateHandles, as returned from stream operator test harnesses. This can be used to write binary snapshots for use in state migration tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/611434c6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/611434c6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/611434c6 Branch: refs/heads/release-1.2 Commit: 611434c6fa8e53cac25dd93f568d2670ec4ead72 Parents: 52fb578 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Apr 24 12:31:53 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed May 3 13:50:04 2017 +0200 ---------------------------------------------------------------------- .../savepoint/SavepointV1Serializer.java | 23 ++- .../streaming/util/OperatorSnapshotUtil.java | 156 +++++++++++++++++++ 2 files changed, 172 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/611434c6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index ba1949a..a9fa3c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint.savepoint; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskState; @@ -47,7 +49,8 @@ import java.util.Map; * that no default Java serialization is used for serialization. Therefore, we * don't rely on any involved Java classes to stay the same. */ -class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { +@Internal +public class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { private static final byte NULL_HANDLE = 0; private static final byte BYTE_STREAM_STATE_HANDLE = 1; @@ -209,7 +212,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { keyedStateStream); } - private static void serializeKeyGroupStateHandle( + @VisibleForTesting + public static void serializeKeyGroupStateHandle( KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle != null) { @@ -225,7 +229,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { } } - private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException { + @VisibleForTesting + public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException { final int type = dis.readByte(); if (NULL_HANDLE == type) { return null; @@ -245,7 +250,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { } } - private static void serializeOperatorStateHandle( + @VisibleForTesting + public static void serializeOperatorStateHandle( OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle != null) { @@ -273,7 +279,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { } } - private static OperatorStateHandle deserializeOperatorStateHandle( + @VisibleForTesting + public static OperatorStateHandle deserializeOperatorStateHandle( DataInputStream dis) throws IOException { final int type = dis.readByte(); @@ -304,7 +311,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { } } - private static void serializeStreamStateHandle( + @VisibleForTesting + public static void serializeStreamStateHandle( StreamStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle == null) { @@ -331,7 +339,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { dos.flush(); } - private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException { + @VisibleForTesting + public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException { int type = dis.read(); if (NULL_HANDLE == type) { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/611434c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java new file mode 100644 index 0000000..1a53598 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; + +/** + * Util for writing/reading {@link org.apache.flink.streaming.runtime.tasks.OperatorStateHandles}, + * for use in tests. + */ +public class OperatorSnapshotUtil { + + public static String getResourceFilename(String filename) { + ClassLoader cl = OperatorSnapshotUtil.class.getClassLoader(); + URL resource = cl.getResource(filename); + return resource.getFile(); + } + + public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException { + FileOutputStream out = new FileOutputStream(path); + DataOutputStream dos = new DataOutputStream(out); + + dos.writeInt(state.getOperatorChainIndex()); + + SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); + + Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState(); + if (rawOperatorState != null) { + dos.writeInt(rawOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : rawOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); + } + + Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState(); + if (managedOperatorState != null) { + dos.writeInt(managedOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : managedOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); + } + + Collection<KeyGroupsStateHandle> rawKeyedState = state.getRawKeyedState(); + if (rawKeyedState != null) { + dos.writeInt(rawKeyedState.size()); + for (KeyGroupsStateHandle keyedStateHandle : rawKeyedState) { + SavepointV1Serializer.serializeKeyGroupStateHandle(keyedStateHandle, dos); + } + } else { + // this means no operator states, not even an empty list + dos.writeInt(-1); + } + + Collection<KeyGroupsStateHandle> managedKeyedState = state.getManagedKeyedState(); + if (managedKeyedState != null) { + dos.writeInt(managedKeyedState.size()); + for (KeyGroupsStateHandle keyedStateHandle : managedKeyedState) { + SavepointV1Serializer.serializeKeyGroupStateHandle(keyedStateHandle, dos); + } + } else { + // this means no operator states, not even an empty list + dos.writeInt(-1); + } + + dos.flush(); + out.close(); + } + + public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException { + FileInputStream in = new FileInputStream(path); + DataInputStream dis = new DataInputStream(in); + int index = dis.readInt(); + + StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis); + + List<OperatorStateHandle> rawOperatorState = null; + int numRawOperatorStates = dis.readInt(); + if (numRawOperatorStates >= 0) { + rawOperatorState = new ArrayList<>(); + for (int i = 0; i < numRawOperatorStates; i++) { + OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( + dis); + rawOperatorState.add(operatorState); + } + } + + List<OperatorStateHandle> managedOperatorState = null; + int numManagedOperatorStates = dis.readInt(); + if (numManagedOperatorStates >= 0) { + managedOperatorState = new ArrayList<>(); + for (int i = 0; i < numManagedOperatorStates; i++) { + OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( + dis); + managedOperatorState.add(operatorState); + } + } + + List<KeyGroupsStateHandle> rawKeyedState = null; + int numRawKeyedStates = dis.readInt(); + if (numRawKeyedStates >= 0) { + rawKeyedState = new ArrayList<>(); + for (int i = 0; i < numRawKeyedStates; i++) { + KeyGroupsStateHandle keyedState = SavepointV1Serializer.deserializeKeyGroupStateHandle( + dis); + rawKeyedState.add(keyedState); + } + } + + List<KeyGroupsStateHandle> managedKeyedState = null; + int numManagedKeyedStates = dis.readInt(); + if (numManagedKeyedStates >= 0) { + managedKeyedState = new ArrayList<>(); + for (int i = 0; i < numManagedKeyedStates; i++) { + KeyGroupsStateHandle keyedState = SavepointV1Serializer.deserializeKeyGroupStateHandle( + dis); + managedKeyedState.add(keyedState); + } + } + + return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState); + } +}