[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6156 ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r195136079 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java --- @@ -31,4 +31,22 @@ * @param The type of elements in the state * @param The type of the resulting element in the state */ -public interface InternalAppendingState extends InternalKvState, AppendingState {} +public interface InternalAppendingState extends InternalKvState, AppendingState { --- End diff -- I had a second thought about this and I think just adding the methods only in `InternalAppendingState` might be a better choice in the end, because it feels only required here to have a way of manipulating the internal type. Sorry for that :) ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r195120961 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java --- @@ -22,7 +22,7 @@ /** * The peer to the {@link AppendingState} in the internal state type hierarchy. - * + * --- End diff -- Please revert this. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r195118823 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java --- @@ -30,7 +30,7 @@ * @param The type of the value. */ public class HeapValueState - extends AbstractHeapState> + extends AbstractHeapState --- End diff -- The same also holds for `HeapMapState` etc. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r195118305 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java --- @@ -30,7 +30,7 @@ * @param The type of the value. */ public class HeapValueState - extends AbstractHeapState> + extends AbstractHeapState --- End diff -- You could consider to also use the new `getInternal()` and `updateInternal()` methods insider the methods of this class as well to replace direct calls to the `stateTable`. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r195116930 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java --- @@ -48,4 +48,4 @@ @Override public void release() { } -} \ No newline at end of file +} --- End diff -- Please revert, because it produces change in an unrelated class. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r195116411 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java --- @@ -34,12 +33,10 @@ * @param The type of the input elements. * @param The type of the values in the state. * @param The type of the output elements. - * @param The type of State */ -public abstract class AbstractHeapMergingState - extends AbstractHeapState - implements InternalMergingState { - +public abstract class AbstractHeapMergingState + extends AbstractHeapState + implements InternalMergingState, org.apache.flink.runtime.state.internal.InternalAppendingState { --- End diff -- I would make this a regular import. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r195114015 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -94,6 +97,15 @@ public RocksDBListState( return valueSerializer; } + @Override + public byte[] getSerializedValue( --- End diff -- I don't think this overriding makes sense. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r195111820 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -237,4 +249,20 @@ public void addAll(List values) throws Exception { return keySerializationStream.toByteArray(); } + + @Override + public List getInternal() { + Iterable list = get(); + if (list == null) { + return null; + } + List collected = new ArrayList<>(); --- End diff -- We could currently also safe the whole repacking if we change the signature of `Iterable get()` in this class to return ``List``. However, I think in the long run it might be worth considering to have this class be based on `Iterable` instead of `List` because we essentially only use iterable semantics. @aljoscha what do you think? ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r195089054 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java --- @@ -31,4 +31,22 @@ * @param The type of elements in the state * @param The type of the resulting element in the state */ -public interface InternalAppendingState extends InternalKvState, AppendingState {} +public interface InternalAppendingState extends InternalKvState, AppendingState { --- End diff -- It almost seems to me that this methods are not truly specific to merging state and could simply become part of the ``InternalKvState`` interface and the abstract implementations can also move directly to the respective abstract classes ``AbstractRocksDBState`` and ``AbstractHeapState``. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r195019971 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -237,4 +249,20 @@ public void addAll(List values) throws Exception { return keySerializationStream.toByteArray(); } + + @Override + public List getInternal() { + Iterable list = get(); + if (list == null) { + return null; + } + List collected = new ArrayList<>(); --- End diff -- Maybe we could give this a safe init size, e.g (4). ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194952224 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java --- @@ -87,55 +87,14 @@ public RocksDBAggregatingState( @Override public R get() throws IOException { - try { - // prepare the current key and namespace for RocksDB lookup - writeCurrentKeyWithGroupAndNamespace(); - final byte[] key = keySerializationStream.toByteArray(); - - // get the current value - final byte[] valueBytes = backend.db.get(columnFamily, key); - - if (valueBytes == null) { - return null; - } - - ACC accumulator = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - return aggFunction.getResult(accumulator); - } - catch (IOException | RocksDBException e) { - throw new IOException("Error while retrieving value from RocksDB", e); - } + return aggFunction.getResult(getInternal()); } @Override public void add(T value) throws IOException { - try { - // prepare the current key and namespace for RocksDB lookup - writeCurrentKeyWithGroupAndNamespace(); - final byte[] key = keySerializationStream.toByteArray(); - keySerializationStream.reset(); - - // get the current value - final byte[] valueBytes = backend.db.get(columnFamily, key); - - // deserialize the current accumulator, or create a blank one - ACC accumulator = valueBytes == null ? - aggFunction.createAccumulator() : - valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - - // aggregate the value into the accumulator - accumulator = aggFunction.add(value, accumulator); - - // serialize the new accumulator - final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - valueSerializer.serialize(accumulator, out); - - // write the new value to RocksDB - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } - catch (IOException | RocksDBException e) { - throw new IOException("Error while adding value to RocksDB", e); - } + ACC accumulator = getInternal(); + accumulator = accumulator == null ? aggFunction.createAccumulator() : accumulator; + updateInternal(aggFunction.add(value, accumulator)); --- End diff -- This is the same as for `FoldState` and `ReducingState`, do you think this should be improved? I'm not sure because the serialization of the key bytes seems not so expensive in the most user cases. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194951927 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java --- @@ -87,55 +87,14 @@ public RocksDBAggregatingState( @Override public R get() throws IOException { - try { - // prepare the current key and namespace for RocksDB lookup - writeCurrentKeyWithGroupAndNamespace(); - final byte[] key = keySerializationStream.toByteArray(); - - // get the current value - final byte[] valueBytes = backend.db.get(columnFamily, key); - - if (valueBytes == null) { - return null; - } - - ACC accumulator = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - return aggFunction.getResult(accumulator); - } - catch (IOException | RocksDBException e) { - throw new IOException("Error while retrieving value from RocksDB", e); - } + return aggFunction.getResult(getInternal()); } @Override public void add(T value) throws IOException { - try { - // prepare the current key and namespace for RocksDB lookup - writeCurrentKeyWithGroupAndNamespace(); - final byte[] key = keySerializationStream.toByteArray(); - keySerializationStream.reset(); - - // get the current value - final byte[] valueBytes = backend.db.get(columnFamily, key); - - // deserialize the current accumulator, or create a blank one - ACC accumulator = valueBytes == null ? - aggFunction.createAccumulator() : - valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - - // aggregate the value into the accumulator - accumulator = aggFunction.add(value, accumulator); - - // serialize the new accumulator - final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - valueSerializer.serialize(accumulator, out); - - // write the new value to RocksDB - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } - catch (IOException | RocksDBException e) { - throw new IOException("Error while adding value to RocksDB", e); - } + ACC accumulator = getInternal(); + accumulator = accumulator == null ? aggFunction.createAccumulator() : accumulator; + updateInternal(aggFunction.add(value, accumulator)); --- End diff -- We have to serialize the key bytes twice currently, the previous version only need to serialize the key bytes once. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194951490 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java --- @@ -0,0 +1,71 @@ +package org.apache.flink.contrib.streaming.state; --- End diff -- RAT problem. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194951510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java --- @@ -0,0 +1,48 @@ +package org.apache.flink.runtime.state.heap; --- End diff -- RAT problem. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194951373 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java --- @@ -0,0 +1,71 @@ +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.internal.InternalAppendingState; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +import java.io.IOException; + +abstract class AbstractRocksDBAppendingState + extends AbstractRocksDBState + implements InternalAppendingState { + + /** +* Creates a new RocksDB backed state. --- End diff -- typo: `backed` -> `backend` ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194947600 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java --- @@ -0,0 +1,71 @@ +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.internal.InternalAppendingState; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +import java.io.IOException; + +abstract class AbstractRocksDBAppendingState + extends AbstractRocksDBState + implements InternalAppendingState { + + /** +* Creates a new RocksDB backed state. +* +* @param columnFamilyThe RocksDB column family that this state is associated to. +* @param namespaceSerializer The serializer for the namespace. +* @param valueSerializer The serializer for the state. +* @param defaultValueThe default value for the state. +* @param backend The backend for which this state is bind to. +*/ + protected AbstractRocksDBAppendingState( + ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + TypeSerializer valueSerializer, + SV defaultValue, + RocksDBKeyedStateBackend backend) { + super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend); + } + + @Override + public SV getInternal() throws IOException { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + byte[] valueBytes = backend.db.get(columnFamily, key); + if (valueBytes == null) { + return null; + } + return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); + } catch (IOException | RocksDBException e) { + throw new IOException("Error while retrieving data from RocksDB", e); + } + } + + @Override + public void updateInternal(SV valueToStore) throws IOException { + try { + // prepare the current key and namespace for RocksDB lookup + writeCurrentKeyWithGroupAndNamespace(); + final byte[] key = keySerializationStream.toByteArray(); + keySerializationStream.reset(); + + // serialize new value + final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + valueSerializer.serialize(valueToStore, out); + + // write the new value to RocksDB + backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); + } + catch (IOException | RocksDBException e) { + throw new IOException("Error while adding value to RocksDB", e); --- End diff -- nit: throws `IOException` seems like a bit weird.. could this be replaced by the `FlinkRuntimeException`? ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194946663 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java --- @@ -87,55 +87,14 @@ public RocksDBAggregatingState( @Override public R get() throws IOException { - try { - // prepare the current key and namespace for RocksDB lookup - writeCurrentKeyWithGroupAndNamespace(); - final byte[] key = keySerializationStream.toByteArray(); - - // get the current value - final byte[] valueBytes = backend.db.get(columnFamily, key); - - if (valueBytes == null) { - return null; - } - - ACC accumulator = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - return aggFunction.getResult(accumulator); - } - catch (IOException | RocksDBException e) { - throw new IOException("Error while retrieving value from RocksDB", e); - } + return aggFunction.getResult(getInternal()); --- End diff -- This might throw `NEP`, cause `getInternal` return `null`. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194795632 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -237,4 +242,16 @@ public void addAll(List values) throws Exception { return keySerializationStream.toByteArray(); } + + @Override + public List getInternal() { + List list = new ArrayList<>(); + get().forEach(list::add); --- End diff -- There seems could be NPE here, cause `get()` may return `null`. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194796355 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -94,6 +94,11 @@ public RocksDBListState( return valueSerializer; } + @Override + public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, TypeSerializer safeKeySerializer, TypeSerializer safeNamespaceSerializer, TypeSerializer> safeValueSerializer) throws Exception { + return new byte[0]; --- End diff -- Maybe we could use a static field to avoid to new the `byte[0]` every time. ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194790994 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java --- @@ -87,40 +83,14 @@ public RocksDBFoldingState(ColumnFamilyHandle columnFamily, } @Override - public ACC get() { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, key); - if (valueBytes == null) { - return null; - } - return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - } catch (IOException | RocksDBException e) { - throw new RuntimeException("Error while retrieving data from RocksDB", e); - } + public ACC get() throws IOException { + return getInternal(); } @Override - public void add(T value) throws IOException { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, key); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - if (valueBytes == null) { - keySerializationStream.reset(); - valueSerializer.serialize(foldFunction.fold(getDefaultValue(), value), out); - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } else { - ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - ACC newValue = foldFunction.fold(oldValue, value); - keySerializationStream.reset(); - valueSerializer.serialize(newValue, out); - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } - } catch (Exception e) { - throw new RuntimeException("Error while adding data to RocksDB", e); - } + public void add(T value) throws Exception { + ACC accumulator = getInternal(); + accumulator = accumulator == null ? getDefaultValue() : foldFunction.fold(accumulator, value); --- End diff -- This seems not consistency with the previous version. Should this be ```java accumulator = foldFunction.fold(accumulator == null ? getDefaultValue() : accumulator, value); ``` ---
[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...
GitHub user azagrebin opened a pull request: https://github.com/apache/flink/pull/6156 [FLINK-9572] Extend InternalAppendingState with internal stored state access ## What is the purpose of the change Extend InternalAppendingState with get and update methods for internal stored state. Implement them in concrete states in backends. ## Brief change log - *InternalAppendingState* has now methods *getInternal* and *updateInternal* - Heap and Rocksdb merging states implement the methods ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. It should be covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/azagrebin/flink FLINK-9572 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6156.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6156 commit f235b327d7db370b6e4457ecc863cbf2626b4488 Author: Andrey Zagrebin Date: 2018-06-12T15:24:20Z [FLINK-9572] Extend InternalAppendingState with internal stored state access ---