[
https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15899685#comment-15899685
]
ASF GitHub Bot commented on FLINK-5715:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3466#discussion_r104690326
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
---
@@ -68,62 +63,29 @@ public HeapReducingState(
@Override
public V get() {
- Preconditions.checkState(currentNamespace != null, "No
namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No
key set.");
-
- Map<N, Map<K, V>> namespaceMap =
-
stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- return keyedMap.get(backend.<K>getCurrentKey());
+ return stateTable.get(currentNamespace);
}
@Override
public void add(V value) throws IOException {
- Preconditions.checkState(currentNamespace != null, "No
namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No
key set.");
+ final N namespace = currentNamespace;
if (value == null) {
clear();
return;
}
- Map<N, Map<K, V>> namespaceMap =
-
stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(),
namespaceMap);
- }
-
- Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
-
- V currentValue = keyedMap.put(backend.<K>getCurrentKey(),
value);
+ final StateTable<K, N, V> map = stateTable;
+ final V currentValue = map.putAndGetOld(namespace, value);
--- End diff --
I think this leads to one more table lookup than the old version, right?
> Asynchronous snapshotting for HeapKeyedStateBackend
> ---------------------------------------------------
>
> Key: FLINK-5715
> URL: https://issues.apache.org/jira/browse/FLINK-5715
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> Blocking snapshots render the HeapKeyedStateBackend practically unusable for
> many user in productions. Their jobs can not tolerate stopped processing for
> the time it takes to write gigabytes of data from memory to disk.
> Asynchronous snapshots would be a solution to this problem. The challenge for
> the implementation is coming up with a copy-on-write scheme for the in-memory
> hash maps that build the foundation of this backend. After taking a closer
> look, this problem is twofold. First, providing CoW semantics for the hashmap
> itself, as a mutible structure, thereby avoiding costly locking or blocking
> where possible. Second, CoW for the mutable value objects, e.g. through
> cloning via serializers.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)