This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push: new b9a866e [FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Serializer b9a866e is described below commit b9a866e3ba534dfdc4fb63a956402b8526390e36 Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Tue Oct 23 19:12:58 2018 +0200 [FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Serializer Please not that state written with OperatorSnapshotUtil before this commit was written in the V1 format. Now we are using the current V2. This closes #6910. --- .../savepoint/SavepointV2Serializer.java | 23 +++++++++++++++------- .../flink/streaming/util/OperatorSnapshotUtil.java | 22 ++++++++++----------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index faee588..fa84077 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint.savepoint; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; @@ -67,7 +69,9 @@ import java.util.UUID; * +--------------+---------------------+---------+------+---------------+ * </pre> */ -class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { +@Internal +@VisibleForTesting +public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { /** Random magic number for consistency checks */ private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696; @@ -320,7 +324,8 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { keyedStateStream); } - private static void serializeKeyedStateHandle( + @VisibleForTesting + public static void serializeKeyedStateHandle( KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle == null) { @@ -380,7 +385,8 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { return result; } - private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException { + @VisibleForTesting + public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException { final int type = dis.readByte(); if (NULL_HANDLE == type) { @@ -433,7 +439,8 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { } } - private static void serializeOperatorStateHandle( + @VisibleForTesting + public static void serializeOperatorStateHandle( OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle != null) { @@ -461,7 +468,8 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { } } - private static OperatorStateHandle deserializeOperatorStateHandle( + @VisibleForTesting + public static OperatorStateHandle deserializeOperatorStateHandle( DataInputStream dis) throws IOException { final int type = dis.readByte(); @@ -492,7 +500,8 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { } } - private static void serializeStreamStateHandle( + @VisibleForTesting + public static void serializeStreamStateHandle( StreamStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle == null) { @@ -518,7 +527,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { dos.flush(); } - private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException { + public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException { final int type = dis.read(); if (NULL_HANDLE == type) { return null; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java index 1b5113d..53627d5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.util; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; @@ -55,13 +55,13 @@ public class OperatorSnapshotUtil { dos.writeInt(0); // still required for compatibility - SavepointV1Serializer.serializeStreamStateHandle(null, dos); + SavepointV2Serializer.serializeStreamStateHandle(null, dos); Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState(); if (rawOperatorState != null) { dos.writeInt(rawOperatorState.size()); for (OperatorStateHandle operatorStateHandle : rawOperatorState) { - SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + SavepointV2Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); } } else { // this means no states, not even an empty list @@ -72,7 +72,7 @@ public class OperatorSnapshotUtil { if (managedOperatorState != null) { dos.writeInt(managedOperatorState.size()); for (OperatorStateHandle operatorStateHandle : managedOperatorState) { - SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + SavepointV2Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); } } else { // this means no states, not even an empty list @@ -83,7 +83,7 @@ public class OperatorSnapshotUtil { if (rawKeyedState != null) { dos.writeInt(rawKeyedState.size()); for (KeyedStateHandle keyedStateHandle : rawKeyedState) { - SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + SavepointV2Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); } } else { // this means no operator states, not even an empty list @@ -94,7 +94,7 @@ public class OperatorSnapshotUtil { if (managedKeyedState != null) { dos.writeInt(managedKeyedState.size()); for (KeyedStateHandle keyedStateHandle : managedKeyedState) { - SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + SavepointV2Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); } } else { // this means no operator states, not even an empty list @@ -113,14 +113,14 @@ public class OperatorSnapshotUtil { dis.readInt(); // still required for compatibility to consume the bytes. - SavepointV1Serializer.deserializeStreamStateHandle(dis); + SavepointV2Serializer.deserializeStreamStateHandle(dis); List<OperatorStateHandle> rawOperatorState = null; int numRawOperatorStates = dis.readInt(); if (numRawOperatorStates >= 0) { rawOperatorState = new ArrayList<>(); for (int i = 0; i < numRawOperatorStates; i++) { - OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( + OperatorStateHandle operatorState = SavepointV2Serializer.deserializeOperatorStateHandle( dis); rawOperatorState.add(operatorState); } @@ -131,7 +131,7 @@ public class OperatorSnapshotUtil { if (numManagedOperatorStates >= 0) { managedOperatorState = new ArrayList<>(); for (int i = 0; i < numManagedOperatorStates; i++) { - OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( + OperatorStateHandle operatorState = SavepointV2Serializer.deserializeOperatorStateHandle( dis); managedOperatorState.add(operatorState); } @@ -142,7 +142,7 @@ public class OperatorSnapshotUtil { if (numRawKeyedStates >= 0) { rawKeyedState = new ArrayList<>(); for (int i = 0; i < numRawKeyedStates; i++) { - KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( + KeyedStateHandle keyedState = SavepointV2Serializer.deserializeKeyedStateHandle( dis); rawKeyedState.add(keyedState); } @@ -153,7 +153,7 @@ public class OperatorSnapshotUtil { if (numManagedKeyedStates >= 0) { managedKeyedState = new ArrayList<>(); for (int i = 0; i < numManagedKeyedStates; i++) { - KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( + KeyedStateHandle keyedState = SavepointV2Serializer.deserializeKeyedStateHandle( dis); managedKeyedState.add(keyedState); }