[ https://issues.apache.org/jira/browse/KAFKA-10246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
A. Sophie Blee-Goldman resolved KAFKA-10246. -------------------------------------------- Resolution: Fixed > AbstractProcessorContext topic() throws NullPointerException when modifying a > state store within the DSL from a punctuator > -------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-10246 > URL: https://issues.apache.org/jira/browse/KAFKA-10246 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.5.0 > Environment: linux, windows, java 11 > Reporter: Peter Pringle > Priority: Major > > NullPointerException seen when a KTable statestore is being modified by a > punctuated method which is added to a topology via the DSL processor/ktable > valueTransfomer methods. > It seems valid for AbstractProcessorContext.topic() to return null; however > the check below returns a NullPointerException before a null can be returned. > {quote}if (topic.equals(NONEXIST_TOPIC)) { > {quote} > Made a local fix to reverse the ordering of the check (i.e. avoid the null) > and this appears to fix the issue and sends the change to the state stores > changelog topic. > {quote}if (NONEXIST_TOPIC.equals(topic)) { > {quote} > Stacktrace below > {{2020-07-02 07:29:46,829 > [ABC_aggregator-551a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] ERROR > [o.a.k.s.p.i.StreamThread]: stream-thread [ABC_aggregator-5}} > {{51a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] Encountered the > following error during processing:}} > {{java.lang.NullPointerException: null}} > \{{ at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:141)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}} > \{{ at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}} > \{{ at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} > \{{ at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}} > \{{ at > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:118)}} > \{{ at > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:97)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}} > \{{ at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}} > \{{ at > org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:118)}} > \{{ at > org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:65)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}} > \{{ at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}} > \{{ at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)}} > \{{ at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)}} > \{{ at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)}} > \{{ at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)}} > \{{ at > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)}} > \{{ at > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)}} > \{{ at > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}} > \{{ at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}} > \{{ at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} > \{{ at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}} > \{{ at > com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$0(StreamsBuilderHelper.java:55) > // punctuated lambda - user code}} > \{{ at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) > //iterates over the state store and cleans up old items}} > \{{ at > com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$1(StreamsBuilderHelper.java:47)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$punctuate$3(ProcessorNode.java:161)}} > \{{ at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:161)}} > \{{ at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$punctuate$4(StreamTask.java:445)}} > \{{ at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} > \{{ at > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:445)}} > \{{ at > org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)}} > \{{ at > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868)}} > \{{ at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502)}} > \{{ at > org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557)}} > \{{ at > org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951)}} > \{{ at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)}} > \{{ at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)}} > { > { at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)} > } -- This message was sent by Atlassian Jira (v8.3.4#803005)