Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6156#discussion_r194951927
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 ---
    @@ -87,55 +87,14 @@ public RocksDBAggregatingState(
     
        @Override
        public R get() throws IOException {
    -           try {
    -                   // prepare the current key and namespace for RocksDB 
lookup
    -                   writeCurrentKeyWithGroupAndNamespace();
    -                   final byte[] key = keySerializationStream.toByteArray();
    -
    -                   // get the current value
    -                   final byte[] valueBytes = backend.db.get(columnFamily, 
key);
    -
    -                   if (valueBytes == null) {
    -                           return null;
    -                   }
    -
    -                   ACC accumulator = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
    -                   return aggFunction.getResult(accumulator);
    -           }
    -           catch (IOException | RocksDBException e) {
    -                   throw new IOException("Error while retrieving value 
from RocksDB", e);
    -           }
    +           return aggFunction.getResult(getInternal());
        }
     
        @Override
        public void add(T value) throws IOException {
    -           try {
    -                   // prepare the current key and namespace for RocksDB 
lookup
    -                   writeCurrentKeyWithGroupAndNamespace();
    -                   final byte[] key = keySerializationStream.toByteArray();
    -                   keySerializationStream.reset();
    -
    -                   // get the current value
    -                   final byte[] valueBytes = backend.db.get(columnFamily, 
key);
    -
    -                   // deserialize the current accumulator, or create a 
blank one
    -                   ACC accumulator = valueBytes == null ?
    -                                   aggFunction.createAccumulator() :
    -                                   valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
    -
    -                   // aggregate the value into the accumulator
    -                   accumulator = aggFunction.add(value, accumulator);
    -
    -                   // serialize the new accumulator
    -                   final DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
    -                   valueSerializer.serialize(accumulator, out);
    -
    -                   // write the new value to RocksDB
    -                   backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
    -           }
    -           catch (IOException | RocksDBException e) {
    -                   throw new IOException("Error while adding value to 
RocksDB", e);
    -           }
    +           ACC accumulator = getInternal();
    +           accumulator = accumulator == null ? 
aggFunction.createAccumulator() : accumulator;
    +           updateInternal(aggFunction.add(value, accumulator));
    --- End diff --
    
    We have to serialize the key bytes twice currently, the previous version 
only need to serialize the key bytes once.


---

Reply via email to