Thorsten Hake created KAFKA-10515:
-------------------------------------
Summary: NPE: Foreign key join serde may not be initialized with
default serde if application is distributed
Key: KAFKA-10515
URL: https://issues.apache.org/jira/browse/KAFKA-10515
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.5.1, 2.6.0
Reporter: Thorsten Hake
The fix of KAFKA-9517 fixed the initialization of the foreign key joins serdes
for KStream applications that do not run distributed over multiple instances.
However, if an application runs distributed over multiple instances, the
foreign key join serdes may still not be initialized leading to the following
NPE:
{noformat}
Encountered the following error during
processing:java.lang.NullPointerException: null
at
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
at
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
at
org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
at
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
at
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}
This happens because the processors for foreign key joins will be distributed
across multiple tasks. The serde will only be initialized with the default
serde during the initialization of the task containing the sink node
("subscription-registration-sink"). So if the task containing the
SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to
the same instance as the task containing the sink node, a NPE will be thrown
because the Serde of the state store used within the
SubscriptionStoreReceiveProcessor is not initialized.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)