[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332118#comment-16332118
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5281


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332113#comment-16332113
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5281
  
Merging this.


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331289#comment-16331289
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r162479898
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -167,24 +167,48 @@ public void update(List values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = 
keySerializationStream.toByteArray();
-   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
 
-   List bytes = new 
ArrayList<>(values.size());
-   for (V value : values) {
-   keySerializationStream.reset();
-   valueSerializer.serialize(value, out);
-   
bytes.add(keySerializationStream.toByteArray());
+   byte[] premerge = getPreMergedValue(values);
+   if (premerge != null) {
+   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   } else {
+   throw new IOException("Failed pre-merge 
values in update()");
}
+   } catch (IOException | RocksDBException e) {
+   throw new RuntimeException("Error while 
updating data to RocksDB", e);
+   }
+   }
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   if (values != null && !values.isEmpty()) {
+   try {
+   writeCurrentKeyWithGroupAndNamespace();
+   byte[] key = 
keySerializationStream.toByteArray();
 
-   byte[] premerge = MergeUtils.merge(bytes);
+   byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
-   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
} else {
-   throw new IOException("Failed pre-merge 
values");
+   throw new IOException("Failed pre-merge 
values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while 
updating data to RocksDB", e);
}
}
}
+
+   private byte[] getPreMergedValue(List values) throws IOException {
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+
+   List bytes = new ArrayList<>(values.size());
--- End diff --

That'll be great, let's get this in. I will dive into FLINK-8441 in a 
couple days. Thanks!


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330515#comment-16330515
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r162343086
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -167,24 +167,48 @@ public void update(List values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = 
keySerializationStream.toByteArray();
-   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
 
-   List bytes = new 
ArrayList<>(values.size());
-   for (V value : values) {
-   keySerializationStream.reset();
-   valueSerializer.serialize(value, out);
-   
bytes.add(keySerializationStream.toByteArray());
+   byte[] premerge = getPreMergedValue(values);
+   if (premerge != null) {
+   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   } else {
+   throw new IOException("Failed pre-merge 
values in update()");
}
+   } catch (IOException | RocksDBException e) {
+   throw new RuntimeException("Error while 
updating data to RocksDB", e);
+   }
+   }
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   if (values != null && !values.isEmpty()) {
+   try {
+   writeCurrentKeyWithGroupAndNamespace();
+   byte[] key = 
keySerializationStream.toByteArray();
 
-   byte[] premerge = MergeUtils.merge(bytes);
+   byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
-   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
} else {
-   throw new IOException("Failed pre-merge 
values");
+   throw new IOException("Failed pre-merge 
values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while 
updating data to RocksDB", e);
}
}
}
+
+   private byte[] getPreMergedValue(List values) throws IOException {
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+
+   List bytes = new ArrayList<>(values.size());
--- End diff --

Interesting, do you have an idea why it did not work? I think it should be 
possible. In general, I am not a big fan of changing this code twice when we 
already assume that we do an overhaul of that part, but we can do it for this 
time if it makes your life easier.

Sorry that we could not get it done for the meetup, but I was blocked with 
another important matter :-(


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327907#comment-16327907
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r161905498
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -167,24 +167,48 @@ public void update(List values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = 
keySerializationStream.toByteArray();
-   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
 
-   List bytes = new 
ArrayList<>(values.size());
-   for (V value : values) {
-   keySerializationStream.reset();
-   valueSerializer.serialize(value, out);
-   
bytes.add(keySerializationStream.toByteArray());
+   byte[] premerge = getPreMergedValue(values);
+   if (premerge != null) {
+   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   } else {
+   throw new IOException("Failed pre-merge 
values in update()");
}
+   } catch (IOException | RocksDBException e) {
+   throw new RuntimeException("Error while 
updating data to RocksDB", e);
+   }
+   }
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   if (values != null && !values.isEmpty()) {
+   try {
+   writeCurrentKeyWithGroupAndNamespace();
+   byte[] key = 
keySerializationStream.toByteArray();
 
-   byte[] premerge = MergeUtils.merge(bytes);
+   byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
-   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
} else {
-   throw new IOException("Failed pre-merge 
values");
+   throw new IOException("Failed pre-merge 
values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while 
updating data to RocksDB", e);
}
}
}
+
+   private byte[] getPreMergedValue(List values) throws IOException {
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+
+   List bytes = new ArrayList<>(values.size());
--- End diff --

@StefanRRichter  I actually tried it before, but it didn't work out very 
well. I can give it another try.

I don't think this PR should be addressing this issue, because that code is 
already there and this PR only move it to its own method. Besides, it will be 
great if we can get the new API in before I announce it in our Flink meetup at 
Seattle's Wednesday evening :) (Thanks in advance if that may take extra work 
from your end!)

I opened FLINK-8441 and I'll be working on it shortly after.


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326911#comment-16326911
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r161694419
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -167,24 +167,48 @@ public void update(List values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = 
keySerializationStream.toByteArray();
-   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
 
-   List bytes = new 
ArrayList<>(values.size());
-   for (V value : values) {
-   keySerializationStream.reset();
-   valueSerializer.serialize(value, out);
-   
bytes.add(keySerializationStream.toByteArray());
+   byte[] premerge = getPreMergedValue(values);
+   if (premerge != null) {
+   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   } else {
+   throw new IOException("Failed pre-merge 
values in update()");
}
+   } catch (IOException | RocksDBException e) {
+   throw new RuntimeException("Error while 
updating data to RocksDB", e);
+   }
+   }
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   if (values != null && !values.isEmpty()) {
+   try {
+   writeCurrentKeyWithGroupAndNamespace();
+   byte[] key = 
keySerializationStream.toByteArray();
 
-   byte[] premerge = MergeUtils.merge(bytes);
+   byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
-   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
} else {
-   throw new IOException("Failed pre-merge 
values");
+   throw new IOException("Failed pre-merge 
values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while 
updating data to RocksDB", e);
}
}
}
+
+   private byte[] getPreMergedValue(List values) throws IOException {
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+
+   List bytes = new ArrayList<>(values.size());
--- End diff --

Why not just serialize all objects in the stream and write the RocksDB 
separator byte between all object bytes? This could improve performance (less 
temporary copies and objects) and might be equally or more readable?

From a performance point of view, even `#toByteArray()` results in an 
unnecessary copy - we could use the internal array, offset + len for our insert 
to RocksDB.

What do you think?


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326905#comment-16326905
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r161693211
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ---
@@ -756,6 +756,11 @@ public boolean isClearCalled() {
public void update(List values) throws Exception {
clear();
 
+   addAll(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
if (values != null && !values.isEmpty()) {
--- End diff --

I think the check for `!values.isEmpty()` is not required and just 
boilerplate - the following line also works for empty lists.


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326906#comment-16326906
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r161693241
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
@@ -548,6 +548,11 @@ public boolean isClearCalled() {
public void update(List values) throws Exception {
list.clear();
 
+   addAll(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
if (values != null || !values.isEmpty()) {
--- End diff --

Same here.


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326461#comment-16326461
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5281
  
Hi guys, can @StefanRRichter  @aljoscha or any committer else take a look 
at this PR?

I'll host the Seattle Apache Flink Meetup this Wednesday and give a talk. I 
want to talk about the new APIs `ListState#update()` and  `ListState#addAll()`. 
It will be great to get this merged in before then. Thanks!


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321906#comment-16321906
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5281

[FLINK-7938] support addAll() in ListState

## What is the purpose of the change

support `addAll()` in `ListState`, so Flink can be more efficient in adding 
elements to `ListState` in batch. This should give us a much better performance 
especially for `ListState` backed by RocksDB

## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7938

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5281


commit 74e4e3cd7f81621963640774e174bc8814c609e6
Author: Bowen Li 
Date:   2018-01-02T19:21:28Z

update local branch

commit 362bad9a7259a7b75e034c81d488fd09ca506df3
Author: Bowen Li 
Date:   2018-01-04T01:35:11Z

remove sh

commit 24b5a38c73e812340b891f91706e75ca6a183673
Author: Bowen Li 
Date:   2018-01-11T09:11:55Z

add ListState#addAll()




> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2017-10-30 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224751#comment-16224751
 ] 

Aljoscha Krettek commented on FLINK-7938:
-

Sounds good 

> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)