[ https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
John Roesler updated KAFKA-10515: --------------------------------- Priority: Critical (was: Blocker) > NPE: Foreign key join serde may not be initialized with default serde if > application is distributed > --------------------------------------------------------------------------------------------------- > > Key: KAFKA-10515 > URL: https://issues.apache.org/jira/browse/KAFKA-10515 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.6.0, 2.5.1 > Reporter: Thorsten Hake > Priority: Critical > Fix For: 2.7.0, 2.5.2, 2.6.1 > > > The fix of KAFKA-9517 fixed the initialization of the foreign key joins > serdes for KStream applications that do not run distributed over multiple > instances. > However, if an application runs distributed over multiple instances, the > foreign key join serdes may still not be initialized leading to the following > NPE: > {noformat} > Encountered the following error during > processing:java.lang.NullPointerException: null > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85) > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52) > at > org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59) > at > org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50) > at > org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27) > at > org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487) > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102) > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat} > This happens because the processors for foreign key joins will be distributed > across multiple tasks. The serde will only be initialized with the default > serde during the initialization of the task containing the sink node > ("subscription-registration-sink"). So if the task containing the > SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to > the same instance as the task containing the sink node, a NPE will be thrown > because the Serde of the state store used within the > SubscriptionStoreReceiveProcessor is not initialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)