This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new b9a866e  [FLINK-10490][tests] OperatorSnapshotUtil should use 
SavepointV2Serializer
b9a866e is described below

commit b9a866e3ba534dfdc4fb63a956402b8526390e36
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Tue Oct 23 19:12:58 2018 +0200

    [FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Serializer
    
    Please not that state written with OperatorSnapshotUtil before this commit 
was written in the V1 format.
    Now we are using the current V2.
    
    This closes #6910.
---
 .../savepoint/SavepointV2Serializer.java           | 23 +++++++++++++++-------
 .../flink/streaming/util/OperatorSnapshotUtil.java | 22 ++++++++++-----------
 2 files changed, 27 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index faee588..fa84077 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -67,7 +69,9 @@ import java.util.UUID;
  *  +--------------+---------------------+---------+------+---------------+
  * </pre>
  */
-class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
+@Internal
+@VisibleForTesting
+public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> 
{
 
        /** Random magic number for consistency checks */
        private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696;
@@ -320,7 +324,8 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                                keyedStateStream);
        }
 
-       private static void serializeKeyedStateHandle(
+       @VisibleForTesting
+       public static void serializeKeyedStateHandle(
                        KeyedStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
 
                if (stateHandle == null) {
@@ -380,7 +385,8 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                return result;
        }
 
-       private static KeyedStateHandle 
deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
+       @VisibleForTesting
+       public static KeyedStateHandle 
deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
                final int type = dis.readByte();
                if (NULL_HANDLE == type) {
 
@@ -433,7 +439,8 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                }
        }
 
-       private static void serializeOperatorStateHandle(
+       @VisibleForTesting
+       public static void serializeOperatorStateHandle(
                OperatorStateHandle stateHandle, DataOutputStream dos) throws 
IOException {
 
                if (stateHandle != null) {
@@ -461,7 +468,8 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                }
        }
 
-       private static OperatorStateHandle deserializeOperatorStateHandle(
+       @VisibleForTesting
+       public static OperatorStateHandle deserializeOperatorStateHandle(
                        DataInputStream dis) throws IOException {
 
                final int type = dis.readByte();
@@ -492,7 +500,8 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                }
        }
 
-       private static void serializeStreamStateHandle(
+       @VisibleForTesting
+       public static void serializeStreamStateHandle(
                        StreamStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
 
                if (stateHandle == null) {
@@ -518,7 +527,7 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                dos.flush();
        }
 
-       private static StreamStateHandle 
deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+       public static StreamStateHandle 
deserializeStreamStateHandle(DataInputStream dis) throws IOException {
                final int type = dis.read();
                if (NULL_HANDLE == type) {
                        return null;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 1b5113d..53627d5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 
@@ -55,13 +55,13 @@ public class OperatorSnapshotUtil {
                        dos.writeInt(0);
 
                        // still required for compatibility
-                       SavepointV1Serializer.serializeStreamStateHandle(null, 
dos);
+                       SavepointV2Serializer.serializeStreamStateHandle(null, 
dos);
 
                        Collection<OperatorStateHandle> rawOperatorState = 
state.getRawOperatorState();
                        if (rawOperatorState != null) {
                                dos.writeInt(rawOperatorState.size());
                                for (OperatorStateHandle operatorStateHandle : 
rawOperatorState) {
-                                       
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+                                       
SavepointV2Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
                                }
                        } else {
                                // this means no states, not even an empty list
@@ -72,7 +72,7 @@ public class OperatorSnapshotUtil {
                        if (managedOperatorState != null) {
                                dos.writeInt(managedOperatorState.size());
                                for (OperatorStateHandle operatorStateHandle : 
managedOperatorState) {
-                                       
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+                                       
SavepointV2Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
                                }
                        } else {
                                // this means no states, not even an empty list
@@ -83,7 +83,7 @@ public class OperatorSnapshotUtil {
                        if (rawKeyedState != null) {
                                dos.writeInt(rawKeyedState.size());
                                for (KeyedStateHandle keyedStateHandle : 
rawKeyedState) {
-                                       
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+                                       
SavepointV2Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
                                }
                        } else {
                                // this means no operator states, not even an 
empty list
@@ -94,7 +94,7 @@ public class OperatorSnapshotUtil {
                        if (managedKeyedState != null) {
                                dos.writeInt(managedKeyedState.size());
                                for (KeyedStateHandle keyedStateHandle : 
managedKeyedState) {
-                                       
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+                                       
SavepointV2Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
                                }
                        } else {
                                // this means no operator states, not even an 
empty list
@@ -113,14 +113,14 @@ public class OperatorSnapshotUtil {
                        dis.readInt();
 
                        // still required for compatibility to consume the 
bytes.
-                       SavepointV1Serializer.deserializeStreamStateHandle(dis);
+                       SavepointV2Serializer.deserializeStreamStateHandle(dis);
 
                        List<OperatorStateHandle> rawOperatorState = null;
                        int numRawOperatorStates = dis.readInt();
                        if (numRawOperatorStates >= 0) {
                                rawOperatorState = new ArrayList<>();
                                for (int i = 0; i < numRawOperatorStates; i++) {
-                                       OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
+                                       OperatorStateHandle operatorState = 
SavepointV2Serializer.deserializeOperatorStateHandle(
                                                dis);
                                        rawOperatorState.add(operatorState);
                                }
@@ -131,7 +131,7 @@ public class OperatorSnapshotUtil {
                        if (numManagedOperatorStates >= 0) {
                                managedOperatorState = new ArrayList<>();
                                for (int i = 0; i < numManagedOperatorStates; 
i++) {
-                                       OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
+                                       OperatorStateHandle operatorState = 
SavepointV2Serializer.deserializeOperatorStateHandle(
                                                dis);
                                        managedOperatorState.add(operatorState);
                                }
@@ -142,7 +142,7 @@ public class OperatorSnapshotUtil {
                        if (numRawKeyedStates >= 0) {
                                rawKeyedState = new ArrayList<>();
                                for (int i = 0; i < numRawKeyedStates; i++) {
-                                       KeyedStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyedStateHandle(
+                                       KeyedStateHandle keyedState = 
SavepointV2Serializer.deserializeKeyedStateHandle(
                                                dis);
                                        rawKeyedState.add(keyedState);
                                }
@@ -153,7 +153,7 @@ public class OperatorSnapshotUtil {
                        if (numManagedKeyedStates >= 0) {
                                managedKeyedState = new ArrayList<>();
                                for (int i = 0; i < numManagedKeyedStates; i++) 
{
-                                       KeyedStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyedStateHandle(
+                                       KeyedStateHandle keyedState = 
SavepointV2Serializer.deserializeKeyedStateHandle(
                                                dis);
                                        managedKeyedState.add(keyedState);
                                }

Reply via email to