[
https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16467689#comment-16467689
]
Ted Yu commented on KAFKA-6878:
-------------------------------
How about this:
{code}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueSto
index 45f606f..d1080f8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -163,6 +163,9 @@ class CachingKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateStore im
}
private byte[] getInternal(final Bytes key) {
+ if (cache == null) {
+ return null;
+ }
final LRUCacheEntry entry = cache.get(cacheName, key);
if (entry == null) {
final byte[] rawValue = underlying.get(key);
{code}
> NPE when querying global state store not in READY state
> -------------------------------------------------------
>
> Key: KAFKA-6878
> URL: https://issues.apache.org/jira/browse/KAFKA-6878
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0
> Reporter: Salazar
> Priority: Major
>
> Info: using kafka 1.1.0 - confluent 4.1.0
> We're trying to query a global state store, but if we query too quickly after
> the application is started we get a
> NullPointerException(CachingKeyValueStore.java:166).
> We have verified the key is nonNull.
> If we wait long enough (with our current amount of data 5 minutes after
> start) we get an answer but before then we get NPEs.
>
> Looks like there isn't a check to ensure `initInternal` actually sets the
> cache before you're allowed to use it in the get method
>
> stacktrace:
> {code:java}
> java.lang.NullPointerException: null
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55)
> at
> io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)