[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8411:
----------------------------
    Description: 
You can see that {{HeapListState#add(null)}} will result in the whole state 
being cleared or wiped out. There's never a unit test for {{List#add(null)}} in 
{{StateBackendTestBase}}

{code:java}
// HeapListState
@Override
        public void add(V value) {
                final N namespace = currentNamespace;

                if (value == null) {
                        clear();
                        return;
                }

                final StateTable<K, N, ArrayList<V>> map = stateTable;
                ArrayList<V> list = map.get(namespace);

                if (list == null) {
                        list = new ArrayList<>();
                        map.put(namespace, list);
                }
                list.add(value);
        }
{code}



{code:java}
// RocksDBListState
@Override
        public void add(V value) throws IOException {
                try {
                        writeCurrentKeyWithGroupAndNamespace();
                        byte[] key = keySerializationStream.toByteArray();
                        keySerializationStream.reset();
                        DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
                        valueSerializer.serialize(value, out);
                        backend.db.merge(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
                } catch (Exception e) {
                        throw new RuntimeException("Error while adding data to 
RocksDB", e);
                }
        }
{code}

The fix should correct the behavior to be consistent between the two state 
backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
correct behavior, I believe adding null with {{add(null)}} should simply be 
ignored without any consequences.

cc [~srichter]

  was:
You can see that HeapListState#add(null) will result in the whole state being 
cleared or wiped out.

{code:java}
// HeapListState
@Override
        public void add(V value) {
                final N namespace = currentNamespace;

                if (value == null) {
                        clear();
                        return;
                }

                final StateTable<K, N, ArrayList<V>> map = stateTable;
                ArrayList<V> list = map.get(namespace);

                if (list == null) {
                        list = new ArrayList<>();
                        map.put(namespace, list);
                }
                list.add(value);
        }
{code}



{code:java}
// RocksDBListState
@Override
        public void add(V value) throws IOException {
                try {
                        writeCurrentKeyWithGroupAndNamespace();
                        byte[] key = keySerializationStream.toByteArray();
                        keySerializationStream.reset();
                        DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
                        valueSerializer.serialize(value, out);
                        backend.db.merge(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
                } catch (Exception e) {
                        throw new RuntimeException("Error while adding data to 
RocksDB", e);
                }
        }
{code}



> inconsistent behavior between HeapListState#add() and RocksDBListState#add()
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-8411
>                 URL: https://issues.apache.org/jira/browse/FLINK-8411
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.4.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Critical
>             Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>       public void add(V value) {
>               final N namespace = currentNamespace;
>               if (value == null) {
>                       clear();
>                       return;
>               }
>               final StateTable<K, N, ArrayList<V>> map = stateTable;
>               ArrayList<V> list = map.get(namespace);
>               if (list == null) {
>                       list = new ArrayList<>();
>                       map.put(namespace, list);
>               }
>               list.add(value);
>       }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>       public void add(V value) throws IOException {
>               try {
>                       writeCurrentKeyWithGroupAndNamespace();
>                       byte[] key = keySerializationStream.toByteArray();
>                       keySerializationStream.reset();
>                       DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>                       valueSerializer.serialize(value, out);
>                       backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>               } catch (Exception e) {
>                       throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>               }
>       }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to