[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330515#comment-16330515
 ] 

ASF GitHub Bot commented on FLINK-7938:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5281#discussion_r162343086
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
    @@ -167,24 +167,48 @@ public void update(List<V> values) throws Exception {
                        try {
                                writeCurrentKeyWithGroupAndNamespace();
                                byte[] key = 
keySerializationStream.toByteArray();
    -                           DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
     
    -                           List<byte[]> bytes = new 
ArrayList<>(values.size());
    -                           for (V value : values) {
    -                                   keySerializationStream.reset();
    -                                   valueSerializer.serialize(value, out);
    -                                   
bytes.add(keySerializationStream.toByteArray());
    +                           byte[] premerge = getPreMergedValue(values);
    +                           if (premerge != null) {
    +                                   backend.db.put(columnFamily, 
writeOptions, key, premerge);
    +                           } else {
    +                                   throw new IOException("Failed pre-merge 
values in update()");
                                }
    +                   } catch (IOException | RocksDBException e) {
    +                           throw new RuntimeException("Error while 
updating data to RocksDB", e);
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public void addAll(List<V> values) throws Exception {
    +           if (values != null && !values.isEmpty()) {
    +                   try {
    +                           writeCurrentKeyWithGroupAndNamespace();
    +                           byte[] key = 
keySerializationStream.toByteArray();
     
    -                           byte[] premerge = MergeUtils.merge(bytes);
    +                           byte[] premerge = getPreMergedValue(values);
                                if (premerge != null) {
    -                                   backend.db.put(columnFamily, 
writeOptions, key, premerge);
    +                                   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
                                } else {
    -                                   throw new IOException("Failed pre-merge 
values");
    +                                   throw new IOException("Failed pre-merge 
values in addAll()");
                                }
                        } catch (IOException | RocksDBException e) {
                                throw new RuntimeException("Error while 
updating data to RocksDB", e);
                        }
                }
        }
    +
    +   private byte[] getPreMergedValue(List<V> values) throws IOException {
    +           DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
    +
    +           List<byte[]> bytes = new ArrayList<>(values.size());
    --- End diff --
    
    Interesting, do you have an idea why it did not work? I think it should be 
possible. In general, I am not a big fan of changing this code twice when we 
already assume that we do an overhaul of that part, but we can do it for this 
time if it makes your life easier.
    
    Sorry that we could not get it done for the meetup, but I was blocked with 
another important matter :-(


> support addAll() in ListState
> -----------------------------
>
>                 Key: FLINK-7938
>                 URL: https://issues.apache.org/jira/browse/FLINK-7938
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to