Repository: flink Updated Branches: refs/heads/master 390d36132 -> c11f11359
[FLINK-9263][state] Fix concurrency problem in DefaultOperatorStateBackend. This closes #5930. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c11f1135 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c11f1135 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c11f1135 Branch: refs/heads/master Commit: c11f11359b8915533ad886015d57298e3daeb821 Parents: 390d361 Author: sihuazhou <[email protected]> Authored: Fri Apr 27 20:42:33 2018 +0800 Committer: Stefan Richter <[email protected]> Committed: Wed May 2 12:18:36 2018 +0200 ---------------------------------------------------------------------- .../runtime/state/DefaultOperatorStateBackend.java | 2 +- .../flink/runtime/state/HeapBroadcastState.java | 2 +- .../RegisteredBroadcastBackendStateMetaInfo.java | 17 +++++++++++++++++ .../RegisteredOperatorBackendStateMetaInfo.java | 16 ++++++++++++++++ 4 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index edbd605..a2e49cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -636,7 +636,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { private PartitionableListState(PartitionableListState<S> toCopy) { - this(toCopy.stateMetaInfo, toCopy.internalListCopySerializer.copy(toCopy.internalList)); + this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList)); } public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) { http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java index 42e68f3..7ebf1ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java @@ -66,7 +66,7 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K } private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) { - this(toCopy.stateMetaInfo, toCopy.internalMapCopySerializer.copy(toCopy.backingMap)); + this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalMapCopySerializer.copy(toCopy.backingMap)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java index d462b34..7204cd3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java @@ -52,6 +52,23 @@ public class RegisteredBroadcastBackendStateMetaInfo<K, V> { this.valueSerializer = Preconditions.checkNotNull(valueSerializer); } + public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> copy) { + + Preconditions.checkNotNull(copy); + + this.name = copy.name; + this.assignmentMode = copy.assignmentMode; + this.keySerializer = copy.keySerializer.duplicate(); + this.valueSerializer = copy.valueSerializer.duplicate(); + } + + /** + * Creates a deep copy of the itself. + */ + public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() { + return new RegisteredBroadcastBackendStateMetaInfo<>(this); + } + public String getName() { return name; } http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java index af289f9..a9adc8d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java @@ -57,6 +57,22 @@ public class RegisteredOperatorBackendStateMetaInfo<S> { this.assignmentMode = Preconditions.checkNotNull(assignmentMode); } + private RegisteredOperatorBackendStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> copy) { + + Preconditions.checkNotNull(copy); + + this.name = copy.name; + this.partitionStateSerializer = copy.partitionStateSerializer.duplicate(); + this.assignmentMode = copy.assignmentMode; + } + + /** + * Creates a deep copy of the itself. + */ + public RegisteredOperatorBackendStateMetaInfo<S> deepCopy() { + return new RegisteredOperatorBackendStateMetaInfo<>(this); + } + public String getName() { return name; }
