Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6156#discussion_r194790994
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 ---
    @@ -87,40 +83,14 @@ public RocksDBFoldingState(ColumnFamilyHandle 
columnFamily,
        }
     
        @Override
    -   public ACC get() {
    -           try {
    -                   writeCurrentKeyWithGroupAndNamespace();
    -                   byte[] key = keySerializationStream.toByteArray();
    -                   byte[] valueBytes = backend.db.get(columnFamily, key);
    -                   if (valueBytes == null) {
    -                           return null;
    -                   }
    -                   return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
    -           } catch (IOException | RocksDBException e) {
    -                   throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
    -           }
    +   public ACC get() throws IOException {
    +           return getInternal();
        }
     
        @Override
    -   public void add(T value) throws IOException {
    -           try {
    -                   writeCurrentKeyWithGroupAndNamespace();
    -                   byte[] key = keySerializationStream.toByteArray();
    -                   byte[] valueBytes = backend.db.get(columnFamily, key);
    -                   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
    -                   if (valueBytes == null) {
    -                           keySerializationStream.reset();
    -                           
valueSerializer.serialize(foldFunction.fold(getDefaultValue(), value), out);
    -                           backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
    -                   } else {
    -                           ACC oldValue = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
    -                           ACC newValue = foldFunction.fold(oldValue, 
value);
    -                           keySerializationStream.reset();
    -                           valueSerializer.serialize(newValue, out);
    -                           backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
    -                   }
    -           } catch (Exception e) {
    -                   throw new RuntimeException("Error while adding data to 
RocksDB", e);
    -           }
    +   public void add(T value) throws Exception {
    +           ACC accumulator = getInternal();
    +           accumulator = accumulator == null ? getDefaultValue() : 
foldFunction.fold(accumulator, value);
    --- End diff --
    
    This seems not consistency with the previous version. Should this be
    ```java
    accumulator = foldFunction.fold(accumulator == null ? getDefaultValue() : 
accumulator, value);
    ```


---

Reply via email to