Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6156#discussion_r194946663
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 ---
    @@ -87,55 +87,14 @@ public RocksDBAggregatingState(
     
        @Override
        public R get() throws IOException {
    -           try {
    -                   // prepare the current key and namespace for RocksDB 
lookup
    -                   writeCurrentKeyWithGroupAndNamespace();
    -                   final byte[] key = keySerializationStream.toByteArray();
    -
    -                   // get the current value
    -                   final byte[] valueBytes = backend.db.get(columnFamily, 
key);
    -
    -                   if (valueBytes == null) {
    -                           return null;
    -                   }
    -
    -                   ACC accumulator = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
    -                   return aggFunction.getResult(accumulator);
    -           }
    -           catch (IOException | RocksDBException e) {
    -                   throw new IOException("Error while retrieving value 
from RocksDB", e);
    -           }
    +           return aggFunction.getResult(getInternal());
    --- End diff --
    
    This might throw `NEP`, cause `getInternal` return `null`.


---

Reply via email to