[ https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16466681#comment-16466681 ]
Ted Yu commented on KAFKA-6878: ------------------------------- How about the following change ? {code} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKe index 45f606f..1ceb13b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -163,6 +163,9 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im } private byte[] getInternal(final Bytes key) { + if (key == null) { + return null; + } final LRUCacheEntry entry = cache.get(cacheName, key); if (entry == null) { final byte[] rawValue = underlying.get(key); {code} Client can check against null and re-poll if null is encountered. > NPE when querying global state store not in READY state > ------------------------------------------------------- > > Key: KAFKA-6878 > URL: https://issues.apache.org/jira/browse/KAFKA-6878 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Salazar > Priority: Major > > Info: using kafka 1.1.0 - confluent 4.1.0 > We're trying to query a global state store, but if we query too quickly after > the application is started we get a > NullPointerException(CachingKeyValueStore.java:166). > We have verified the key is nonNull. > If we wait long enough (with our current amount of data 5 minutes after > start) we get an answer but before then we get NPEs. > > Looks like there isn't a check to ensure `initInternal` actually sets the > cache before you're allowed to use it in the get method > > stacktrace: > {code:java} > java.lang.NullPointerException: null > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112) > at > org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55) > at > io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)