[
https://issues.apache.org/jira/browse/FLINK-5041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15725655#comment-15725655
]
ASF GitHub Bot commented on FLINK-5041:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2781#discussion_r91053522
--- Diff:
flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint.savepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.checkpoint.KeyGroupState;
+import org.apache.flink.migration.runtime.checkpoint.SubtaskState;
+import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import
org.apache.flink.migration.runtime.state.filesystem.AbstractFileStateHandle;
+import
org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
+import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
+import org.apache.flink.migration.state.MigrationStreamStateHandle;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
+import
org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
+import org.apache.flink.migration.util.SerializedValue;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.MultiStreamStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * <p>
+ * <p>In contrast to previous savepoint versions, this serializer makes
sure
+ * that no default Java serialization is used for serialization.
Therefore, we
+ * don't rely on any involved Java classes to stay the same.
+ */
+public class SavepointV0Serializer implements
SavepointSerializer<SavepointV1> {
+
+ public static final SavepointV0Serializer INSTANCE = new
SavepointV0Serializer();
+ private static final StreamStateHandle SIGNAL_0 = new
ByteStreamStateHandle("SIGNAL_0", new byte[]{0});
+ private static final StreamStateHandle SIGNAL_1 = new
ByteStreamStateHandle("SIGNAL_1", new byte[]{1});
+
+ private static final int MAX_SIZE = 4 * 1024 * 1024;
+
+ private ClassLoader userClassLoader;
--- End diff --
These are shared mutable fields in a singleton object instance. Upon
concurrent use, the different callers will interfere with each other.
I would suggest to try and simply get rid of these fields.
> Implement savepoint backwards compatibility 1.1 -> 1.2
> ------------------------------------------------------
>
> Key: FLINK-5041
> URL: https://issues.apache.org/jira/browse/FLINK-5041
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Affects Versions: 1.2.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> This issue tracks the implementation of backwards compatibility between Flink
> 1.1 and 1.2 releases.
> This task subsumes:
> - Converting old savepoints to new savepoints, including a conversion of
> state handles to their new replacement.
> - Converting keyed state from old backend implementations to their new
> counterparts.
> - Converting operator and function state for all changed operators.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)