[ 
https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16469182#comment-16469182
 ] 

ASF GitHub Bot commented on KAFKA-6878:
---------------------------------------

guozhangwang closed pull request #4978: KAFKA-6878 NPE when querying global 
state store not in READY state
URL: https://github.com/apache/kafka/pull/4978
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 45f606f375e..16684e39c1a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -163,7 +163,10 @@ public boolean isOpen() {
     }
 
     private byte[] getInternal(final Bytes key) {
-        final LRUCacheEntry entry = cache.get(cacheName, key);
+        LRUCacheEntry entry = null;
+        if (cache != null) {
+            entry = cache.get(cacheName, key);
+        }
         if (entry == null) {
             final byte[] rawValue = underlying.get(key);
             if (rawValue == null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 58111a670d6..1d0455b2b6f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -160,6 +160,9 @@ public synchronized void put(final Bytes key, final byte[] 
value, final long tim
         validateStoreOpen();
         final Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, 
timestamp, 0);
         final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
+        if (cache == null) {
+            return underlying.fetch(key, timestamp);
+        }
         final LRUCacheEntry entry = cache.get(name, cacheKey);
         if (entry == null) {
             return underlying.fetch(key, timestamp);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> NPE when querying global state store not in READY state
> -------------------------------------------------------
>
>                 Key: KAFKA-6878
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6878
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Salazar
>            Priority: Major
>
> Info: using kafka 1.1.0 - confluent 4.1.0
> We're trying to query a global state store, but if we query too quickly after 
> the application is started we get a 
> NullPointerException(CachingKeyValueStore.java:166).
> We have verified the key is nonNull.
> If we wait long enough (with our current amount of data 5 minutes after 
> start) we get an answer but before then we get NPEs.
>  
> Looks like there isn't a check to ensure `initInternal` actually sets the 
> cache before you're allowed to use it in the get method
>  
> stacktrace:
> {code:java}
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55)
> at 
> io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to