Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5281#discussion_r161694419
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
    @@ -167,24 +167,48 @@ public void update(List<V> values) throws Exception {
                        try {
                                writeCurrentKeyWithGroupAndNamespace();
                                byte[] key = 
keySerializationStream.toByteArray();
    -                           DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
     
    -                           List<byte[]> bytes = new 
ArrayList<>(values.size());
    -                           for (V value : values) {
    -                                   keySerializationStream.reset();
    -                                   valueSerializer.serialize(value, out);
    -                                   
bytes.add(keySerializationStream.toByteArray());
    +                           byte[] premerge = getPreMergedValue(values);
    +                           if (premerge != null) {
    +                                   backend.db.put(columnFamily, 
writeOptions, key, premerge);
    +                           } else {
    +                                   throw new IOException("Failed pre-merge 
values in update()");
                                }
    +                   } catch (IOException | RocksDBException e) {
    +                           throw new RuntimeException("Error while 
updating data to RocksDB", e);
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public void addAll(List<V> values) throws Exception {
    +           if (values != null && !values.isEmpty()) {
    +                   try {
    +                           writeCurrentKeyWithGroupAndNamespace();
    +                           byte[] key = 
keySerializationStream.toByteArray();
     
    -                           byte[] premerge = MergeUtils.merge(bytes);
    +                           byte[] premerge = getPreMergedValue(values);
                                if (premerge != null) {
    -                                   backend.db.put(columnFamily, 
writeOptions, key, premerge);
    +                                   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
                                } else {
    -                                   throw new IOException("Failed pre-merge 
values");
    +                                   throw new IOException("Failed pre-merge 
values in addAll()");
                                }
                        } catch (IOException | RocksDBException e) {
                                throw new RuntimeException("Error while 
updating data to RocksDB", e);
                        }
                }
        }
    +
    +   private byte[] getPreMergedValue(List<V> values) throws IOException {
    +           DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
    +
    +           List<byte[]> bytes = new ArrayList<>(values.size());
    --- End diff --
    
    Why not just serialize all objects in the stream and write the RocksDB 
separator byte between all object bytes? This could improve performance (less 
temporary copies and objects) and might be equally or more readable?
    
    From a performance point of view, even `#toByteArray()` results in an 
unnecessary copy - we could use the internal array, offset + len for our insert 
to RocksDB.
    
    What do you think?


---

Reply via email to