[jira] [Updated] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8411: Component/s: State Backends, Checkpointing > inconsistent behavior between HeapListState#add() and RocksDBListState#add() > > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTable> map = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8411: Description: You can see that {{HeapListState#add(null)}} will result in the whole state being cleared or wiped out. There's never a unit test for {{List#add(null)}} in {{StateBackendTestBase}} {code:java} // HeapListState @Override public void add(V value) { final N namespace = currentNamespace; if (value == null) { clear(); return; } final StateTable> map = stateTable; ArrayList list = map.get(namespace); if (list == null) { list = new ArrayList<>(); map.put(namespace, list); } list.add(value); } {code} {code:java} // RocksDBListState @Override public void add(V value) throws IOException { try { writeCurrentKeyWithGroupAndNamespace(); byte[] key = keySerializationStream.toByteArray(); keySerializationStream.reset(); DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); valueSerializer.serialize(value, out); backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); } } {code} The fix should correct the behavior to be consistent between the two state backends, as well as adding a unit test for {{ListState#add(null)}}. For the correct behavior, I believe adding null with {{add(null)}} should simply be ignored without any consequences. cc [~srichter] was: You can see that HeapListState#add(null) will result in the whole state being cleared or wiped out. {code:java} // HeapListState @Override public void add(V value) { final N namespace = currentNamespace; if (value == null) { clear(); return; } final StateTable> map = stateTable; ArrayList list = map.get(namespace); if (list == null) { list = new ArrayList<>(); map.put(namespace, list); } list.add(value); } {code} {code:java} // RocksDBListState @Override public void add(V value) throws IOException { try { writeCurrentKeyWithGroupAndNamespace(); byte[] key = keySerializationStream.toByteArray(); keySerializationStream.reset(); DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); valueSerializer.serialize(value, out); backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); } } {code} > inconsistent behavior between HeapListState#add() and RocksDBListState#add() > > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTable> map = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializa
[jira] [Updated] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8411: Priority: Critical (was: Major) > inconsistent behavior between HeapListState#add() and RocksDBListState#add() > > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that HeapListState#add(null) will result in the whole state being > cleared or wiped out. > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTable> map = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)