[jira] [Updated] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()

2018-01-15 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8411:

Component/s: State Backends, Checkpointing

> inconsistent behavior between HeapListState#add() and RocksDBListState#add()
> 
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()

2018-01-11 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8411:

Description: 
You can see that {{HeapListState#add(null)}} will result in the whole state 
being cleared or wiped out. There's never a unit test for {{List#add(null)}} in 
{{StateBackendTestBase}}

{code:java}
// HeapListState
@Override
public void add(V value) {
final N namespace = currentNamespace;

if (value == null) {
clear();
return;
}

final StateTable map = stateTable;
ArrayList list = map.get(namespace);

if (list == null) {
list = new ArrayList<>();
map.put(namespace, list);
}
list.add(value);
}
{code}



{code:java}
// RocksDBListState
@Override
public void add(V value) throws IOException {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
keySerializationStream.reset();
DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
valueSerializer.serialize(value, out);
backend.db.merge(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Error while adding data to 
RocksDB", e);
}
}
{code}

The fix should correct the behavior to be consistent between the two state 
backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
correct behavior, I believe adding null with {{add(null)}} should simply be 
ignored without any consequences.

cc [~srichter]

  was:
You can see that HeapListState#add(null) will result in the whole state being 
cleared or wiped out.

{code:java}
// HeapListState
@Override
public void add(V value) {
final N namespace = currentNamespace;

if (value == null) {
clear();
return;
}

final StateTable map = stateTable;
ArrayList list = map.get(namespace);

if (list == null) {
list = new ArrayList<>();
map.put(namespace, list);
}
list.add(value);
}
{code}



{code:java}
// RocksDBListState
@Override
public void add(V value) throws IOException {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
keySerializationStream.reset();
DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
valueSerializer.serialize(value, out);
backend.db.merge(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Error while adding data to 
RocksDB", e);
}
}
{code}



> inconsistent behavior between HeapListState#add() and RocksDBListState#add()
> 
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = 

[jira] [Updated] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()

2018-01-11 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8411:

Priority: Critical  (was: Major)

> inconsistent behavior between HeapListState#add() and RocksDBListState#add()
> 
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that HeapListState#add(null) will result in the whole state being 
> cleared or wiped out.
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)