[ 
https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16469731#comment-16469731
 ] 

ASF GitHub Bot commented on KAFKA-6878:
---------------------------------------

guozhangwang closed pull request #4988: KAFKA-6878 Switch the order of 
underlying.init and initInternal
URL: https://github.com/apache/kafka/pull/4988
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 16684e39c1a..525e92df22f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -60,8 +60,8 @@
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        underlying.init(context, root);
         initInternal(context);
+        underlying.init(context, root);
         // save the stream thread as we only ever want to trigger a flush
         // when the stream thread is the current thread.
         streamThread = Thread.currentThread();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 068ac88ffba..1bb2ea750c5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -63,8 +63,8 @@
 
     public void init(final ProcessorContext context, final StateStore root) {
         topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
-        bytesStore.init(context, root);
         initInternal((InternalProcessorContext) context);
+        bytesStore.init(context, root);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 1d0455b2b6f..7e58b684f23 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -67,8 +67,8 @@
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        underlying.init(context, root);
         initInternal(context);
+        underlying.init(context, root);
         keySchema.init(context.applicationId());
     }
 
@@ -178,7 +178,9 @@ public synchronized void put(final Bytes key, final byte[] 
value, final long tim
         validateStoreOpen();
 
         final WindowStoreIterator<byte[]> underlyingIterator = 
underlying.fetch(key, timeFrom, timeTo);
-
+        if (cache == null) {
+            return underlyingIterator;
+        }
         final Bytes cacheKeyFrom = 
cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom));
         final Bytes cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(name, cacheKeyFrom, cacheKeyTo);
@@ -201,7 +203,9 @@ public synchronized void put(final Bytes key, final byte[] 
value, final long tim
         validateStoreOpen();
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
underlying.fetch(from, to, timeFrom, timeTo);
-
+        if (cache == null) {
+            return underlyingIterator;
+        }
         final Bytes cacheKeyFrom = 
cacheFunction.cacheKey(keySchema.lowerRange(from, timeFrom));
         final Bytes cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRange(to, timeTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(name, cacheKeyFrom, cacheKeyTo);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> NPE when querying global state store not in READY state
> -------------------------------------------------------
>
>                 Key: KAFKA-6878
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6878
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Salazar
>            Priority: Major
>
> Info: using kafka 1.1.0 - confluent 4.1.0
> We're trying to query a global state store, but if we query too quickly after 
> the application is started we get a 
> NullPointerException(CachingKeyValueStore.java:166).
> We have verified the key is nonNull.
> If we wait long enough (with our current amount of data 5 minutes after 
> start) we get an answer but before then we get NPEs.
>  
> Looks like there isn't a check to ensure `initInternal` actually sets the 
> cache before you're allowed to use it in the get method
>  
> stacktrace:
> {code:java}
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55)
> at 
> io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to