[ https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330515#comment-16330515 ]
ASF GitHub Bot commented on FLINK-7938: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5281#discussion_r162343086 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -167,24 +167,48 @@ public void update(List<V> values) throws Exception { try { writeCurrentKeyWithGroupAndNamespace(); byte[] key = keySerializationStream.toByteArray(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - List<byte[]> bytes = new ArrayList<>(values.size()); - for (V value : values) { - keySerializationStream.reset(); - valueSerializer.serialize(value, out); - bytes.add(keySerializationStream.toByteArray()); + byte[] premerge = getPreMergedValue(values); + if (premerge != null) { + backend.db.put(columnFamily, writeOptions, key, premerge); + } else { + throw new IOException("Failed pre-merge values in update()"); } + } catch (IOException | RocksDBException e) { + throw new RuntimeException("Error while updating data to RocksDB", e); + } + } + } + + @Override + public void addAll(List<V> values) throws Exception { + if (values != null && !values.isEmpty()) { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); - byte[] premerge = MergeUtils.merge(bytes); + byte[] premerge = getPreMergedValue(values); if (premerge != null) { - backend.db.put(columnFamily, writeOptions, key, premerge); + backend.db.merge(columnFamily, writeOptions, key, premerge); } else { - throw new IOException("Failed pre-merge values"); + throw new IOException("Failed pre-merge values in addAll()"); } } catch (IOException | RocksDBException e) { throw new RuntimeException("Error while updating data to RocksDB", e); } } } + + private byte[] getPreMergedValue(List<V> values) throws IOException { + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + + List<byte[]> bytes = new ArrayList<>(values.size()); --- End diff -- Interesting, do you have an idea why it did not work? I think it should be possible. In general, I am not a big fan of changing this code twice when we already assume that we do an overhaul of that part, but we can do it for this time if it makes your life easier. Sorry that we could not get it done for the meetup, but I was blocked with another important matter :-( > support addAll() in ListState > ----------------------------- > > Key: FLINK-7938 > URL: https://issues.apache.org/jira/browse/FLINK-7938 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Reporter: Bowen Li > Assignee: Bowen Li > Priority: Major > Fix For: 1.5.0 > > > support {{addAll()}} in {{ListState}}, so Flink can be more efficient in > adding elements to {{ListState}} in batch. This should give us a much better > performance especially for {{ListState}} backed by RocksDB -- This message was sent by Atlassian JIRA (v7.6.3#76005)