[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16061898#comment-16061898
 ] 

Evgeny Veretennikov commented on KAFKA-4750:
--------------------------------------------

[~mjsax], are you sure about call chain in {{RocksDBStore}}? Actually it is 
{{delete(key) -> put(key, null) -> putInternal(serdes.rawKey(key), 
serdes.rawValue(value)}}. Check {{RocksDBStore.put()}} method:

{code:java}
public synchronized void put(K key, V value) {
    Objects.requireNonNull(key, "key cannot be null");
    validateStoreOpen();
    byte[] rawKey = serdes.rawKey(key);
    byte[] rawValue = serdes.rawValue(value);
    putInternal(rawKey, rawValue);
}
{code}

So, {{delete()}} doesn't actually delete key-value pair if 
{{serdes.rawValue(null)}} doesn't return {{null}}.

For example, look at attached test. Here store uses custom serde, which 
serializes null into not-null value {{"123".getBytes()}}. Test fails with 
current {{RocksDBStore.delete()}} implementation and succeeds with such change:

{noformat}
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -301,7 +301,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
     public synchronized V delete(K key) {
         Objects.requireNonNull(key, "key cannot be null");
         V value = get(key);
-        put(key, null);
+        validateStoreOpen();
+        byte[] rawKey = serdes.rawKey(key);
+        putInternal(rawKey, null);
         return value;
     }
{noformat}

Though, in {{ChangeLoggingKeyValueBytesStore}} this problem really doesn't 
appear, that was my mistake.

> KeyValueIterator returns null values
> ------------------------------------
>
>                 Key: KAFKA-4750
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4750
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.1, 0.11.0.0, 0.10.2.1
>            Reporter: Michal Borowiecki
>            Assignee: Evgeny Veretennikov
>              Labels: newbie
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to