Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6156#discussion_r194947600
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
 ---
    @@ -0,0 +1,71 @@
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.internal.InternalAppendingState;
    +
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDBException;
    +
    +import java.io.IOException;
    +
    +abstract class AbstractRocksDBAppendingState <K, N, IN, SV, OUT, S extends 
State>
    +   extends AbstractRocksDBState<K, N, SV, S>
    +   implements InternalAppendingState<K, N, IN, SV, OUT> {
    +
    +   /**
    +    * Creates a new RocksDB backed state.
    +    *
    +    * @param columnFamily        The RocksDB column family that this state 
is associated to.
    +    * @param namespaceSerializer The serializer for the namespace.
    +    * @param valueSerializer     The serializer for the state.
    +    * @param defaultValue        The default value for the state.
    +    * @param backend             The backend for which this state is bind 
to.
    +    */
    +   protected AbstractRocksDBAppendingState(
    +           ColumnFamilyHandle columnFamily,
    +           TypeSerializer<N> namespaceSerializer,
    +           TypeSerializer<SV> valueSerializer,
    +           SV defaultValue,
    +           RocksDBKeyedStateBackend<K> backend) {
    +           super(columnFamily, namespaceSerializer, valueSerializer, 
defaultValue, backend);
    +   }
    +
    +   @Override
    +   public SV getInternal() throws IOException {
    +           try {
    +                   writeCurrentKeyWithGroupAndNamespace();
    +                   byte[] key = keySerializationStream.toByteArray();
    +                   byte[] valueBytes = backend.db.get(columnFamily, key);
    +                   if (valueBytes == null) {
    +                           return null;
    +                   }
    +                   return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
    +           } catch (IOException | RocksDBException e) {
    +                   throw new IOException("Error while retrieving data from 
RocksDB", e);
    +           }
    +   }
    +
    +   @Override
    +   public void updateInternal(SV valueToStore) throws IOException {
    +           try {
    +                   // prepare the current key and namespace for RocksDB 
lookup
    +                   writeCurrentKeyWithGroupAndNamespace();
    +                   final byte[] key = keySerializationStream.toByteArray();
    +                   keySerializationStream.reset();
    +
    +                   // serialize new value
    +                   final DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
    +                   valueSerializer.serialize(valueToStore, out);
    +
    +                   // write the new value to RocksDB
    +                   backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
    +           }
    +           catch (IOException | RocksDBException e) {
    +                   throw new IOException("Error while adding value to 
RocksDB", e);
    --- End diff --
    
    nit: throws `IOException` seems like a bit weird.. could this be replaced 
by the `FlinkRuntimeException`?


---

Reply via email to