Repository: flink
Updated Branches:
  refs/heads/master 390d36132 -> c11f11359


[FLINK-9263][state] Fix concurrency problem in DefaultOperatorStateBackend.

This closes #5930.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c11f1135
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c11f1135
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c11f1135

Branch: refs/heads/master
Commit: c11f11359b8915533ad886015d57298e3daeb821
Parents: 390d361
Author: sihuazhou <[email protected]>
Authored: Fri Apr 27 20:42:33 2018 +0800
Committer: Stefan Richter <[email protected]>
Committed: Wed May 2 12:18:36 2018 +0200

----------------------------------------------------------------------
 .../runtime/state/DefaultOperatorStateBackend.java |  2 +-
 .../flink/runtime/state/HeapBroadcastState.java    |  2 +-
 .../RegisteredBroadcastBackendStateMetaInfo.java   | 17 +++++++++++++++++
 .../RegisteredOperatorBackendStateMetaInfo.java    | 16 ++++++++++++++++
 4 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index edbd605..a2e49cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -636,7 +636,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                private PartitionableListState(PartitionableListState<S> 
toCopy) {
 
-                       this(toCopy.stateMetaInfo, 
toCopy.internalListCopySerializer.copy(toCopy.internalList));
+                       this(toCopy.stateMetaInfo.deepCopy(), 
toCopy.internalListCopySerializer.copy(toCopy.internalList));
                }
 
                public void 
setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
index 42e68f3..7ebf1ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
@@ -66,7 +66,7 @@ public class HeapBroadcastState<K, V> implements 
BackendWritableBroadcastState<K
        }
 
        private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) {
-               this(toCopy.stateMetaInfo, 
toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
+               this(toCopy.stateMetaInfo.deepCopy(), 
toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
index d462b34..7204cd3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
@@ -52,6 +52,23 @@ public class RegisteredBroadcastBackendStateMetaInfo<K, V> {
                this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
        }
 
+       public 
RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K,
 V> copy) {
+
+               Preconditions.checkNotNull(copy);
+
+               this.name = copy.name;
+               this.assignmentMode = copy.assignmentMode;
+               this.keySerializer = copy.keySerializer.duplicate();
+               this.valueSerializer = copy.valueSerializer.duplicate();
+       }
+
+       /**
+        * Creates a deep copy of the itself.
+        */
+       public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() {
+               return new RegisteredBroadcastBackendStateMetaInfo<>(this);
+       }
+
        public String getName() {
                return name;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
index af289f9..a9adc8d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
@@ -57,6 +57,22 @@ public class RegisteredOperatorBackendStateMetaInfo<S> {
                this.assignmentMode = 
Preconditions.checkNotNull(assignmentMode);
        }
 
+       private 
RegisteredOperatorBackendStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S>
 copy) {
+
+               Preconditions.checkNotNull(copy);
+
+               this.name = copy.name;
+               this.partitionStateSerializer = 
copy.partitionStateSerializer.duplicate();
+               this.assignmentMode = copy.assignmentMode;
+       }
+
+       /**
+        * Creates a deep copy of the itself.
+        */
+       public RegisteredOperatorBackendStateMetaInfo<S> deepCopy() {
+               return new RegisteredOperatorBackendStateMetaInfo<>(this);
+       }
+
        public String getName() {
                return name;
        }

Reply via email to