Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194790994 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java --- @@ -87,40 +83,14 @@ public RocksDBFoldingState(ColumnFamilyHandle columnFamily, } @Override - public ACC get() { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, key); - if (valueBytes == null) { - return null; - } - return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - } catch (IOException | RocksDBException e) { - throw new RuntimeException("Error while retrieving data from RocksDB", e); - } + public ACC get() throws IOException { + return getInternal(); } @Override - public void add(T value) throws IOException { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, key); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - if (valueBytes == null) { - keySerializationStream.reset(); - valueSerializer.serialize(foldFunction.fold(getDefaultValue(), value), out); - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } else { - ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - ACC newValue = foldFunction.fold(oldValue, value); - keySerializationStream.reset(); - valueSerializer.serialize(newValue, out); - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } - } catch (Exception e) { - throw new RuntimeException("Error while adding data to RocksDB", e); - } + public void add(T value) throws Exception { + ACC accumulator = getInternal(); + accumulator = accumulator == null ? getDefaultValue() : foldFunction.fold(accumulator, value); --- End diff -- This seems not consistency with the previous version. Should this be ```java accumulator = foldFunction.fold(accumulator == null ? getDefaultValue() : accumulator, value); ```
---