[jira] [Updated] (KAFKA-10515) NPE: Foreign key join serde may not be initialized with default serde if application is distributed

2020-10-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10515:
-
Fix Version/s: 2.6.1

> NPE: Foreign key join serde may not be initialized with default serde if 
> application is distributed
> ---
>
> Key: KAFKA-10515
> URL: https://issues.apache.org/jira/browse/KAFKA-10515
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.5.1
>Reporter: Thorsten Hake
>Priority: Critical
> Fix For: 2.7.0, 2.6.1
>
>
> The fix of KAFKA-9517 fixed the initialization of the foreign key joins 
> serdes for KStream applications that do not run distributed over multiple 
> instances.
> However, if an application runs distributed over multiple instances, the 
> foreign key join serdes may still not be initialized leading to the following 
> NPE:
> {noformat}
> Encountered the following error during 
> processing:java.lang.NullPointerException: null
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
>   at 
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}
> This happens because the processors for foreign key joins will be distributed 
> across multiple tasks. The serde will only be initialized with the default 
> serde during the initialization of the task containing the sink node 
> ("subscription-registration-sink"). So if the task containing the 
> SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to 
> the same instance as the task 

[jira] [Updated] (KAFKA-10515) NPE: Foreign key join serde may not be initialized with default serde if application is distributed

2020-10-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10515:
-
Fix Version/s: (was: 2.6.1)
   (was: 2.5.2)

> NPE: Foreign key join serde may not be initialized with default serde if 
> application is distributed
> ---
>
> Key: KAFKA-10515
> URL: https://issues.apache.org/jira/browse/KAFKA-10515
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.5.1
>Reporter: Thorsten Hake
>Priority: Critical
> Fix For: 2.7.0
>
>
> The fix of KAFKA-9517 fixed the initialization of the foreign key joins 
> serdes for KStream applications that do not run distributed over multiple 
> instances.
> However, if an application runs distributed over multiple instances, the 
> foreign key join serdes may still not be initialized leading to the following 
> NPE:
> {noformat}
> Encountered the following error during 
> processing:java.lang.NullPointerException: null
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
>   at 
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}
> This happens because the processors for foreign key joins will be distributed 
> across multiple tasks. The serde will only be initialized with the default 
> serde during the initialization of the task containing the sink node 
> ("subscription-registration-sink"). So if the task containing the 
> SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to 

[jira] [Updated] (KAFKA-10515) NPE: Foreign key join serde may not be initialized with default serde if application is distributed

2020-10-09 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10515:
-
Priority: Critical  (was: Blocker)

> NPE: Foreign key join serde may not be initialized with default serde if 
> application is distributed
> ---
>
> Key: KAFKA-10515
> URL: https://issues.apache.org/jira/browse/KAFKA-10515
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.5.1
>Reporter: Thorsten Hake
>Priority: Critical
> Fix For: 2.7.0, 2.5.2, 2.6.1
>
>
> The fix of KAFKA-9517 fixed the initialization of the foreign key joins 
> serdes for KStream applications that do not run distributed over multiple 
> instances.
> However, if an application runs distributed over multiple instances, the 
> foreign key join serdes may still not be initialized leading to the following 
> NPE:
> {noformat}
> Encountered the following error during 
> processing:java.lang.NullPointerException: null
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
>   at 
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}
> This happens because the processors for foreign key joins will be distributed 
> across multiple tasks. The serde will only be initialized with the default 
> serde during the initialization of the task containing the sink node 
> ("subscription-registration-sink"). So if the task containing the 
> SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to 
> the same 

[jira] [Updated] (KAFKA-10515) NPE: Foreign key join serde may not be initialized with default serde if application is distributed

2020-09-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10515:
-
Fix Version/s: 2.6.1
   2.5.2
   2.7.0

> NPE: Foreign key join serde may not be initialized with default serde if 
> application is distributed
> ---
>
> Key: KAFKA-10515
> URL: https://issues.apache.org/jira/browse/KAFKA-10515
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.5.1
>Reporter: Thorsten Hake
>Priority: Blocker
> Fix For: 2.7.0, 2.5.2, 2.6.1
>
>
> The fix of KAFKA-9517 fixed the initialization of the foreign key joins 
> serdes for KStream applications that do not run distributed over multiple 
> instances.
> However, if an application runs distributed over multiple instances, the 
> foreign key join serdes may still not be initialized leading to the following 
> NPE:
> {noformat}
> Encountered the following error during 
> processing:java.lang.NullPointerException: null
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
>   at 
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}
> This happens because the processors for foreign key joins will be distributed 
> across multiple tasks. The serde will only be initialized with the default 
> serde during the initialization of the task containing the sink node 
> ("subscription-registration-sink"). So if the task containing the 
> SubscriptionStoreReceiveProcessor ("subscription-receive") is 

[jira] [Updated] (KAFKA-10515) NPE: Foreign key join serde may not be initialized with default serde if application is distributed

2020-09-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10515:
-
Priority: Blocker  (was: Major)

> NPE: Foreign key join serde may not be initialized with default serde if 
> application is distributed
> ---
>
> Key: KAFKA-10515
> URL: https://issues.apache.org/jira/browse/KAFKA-10515
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.5.1
>Reporter: Thorsten Hake
>Priority: Blocker
>
> The fix of KAFKA-9517 fixed the initialization of the foreign key joins 
> serdes for KStream applications that do not run distributed over multiple 
> instances.
> However, if an application runs distributed over multiple instances, the 
> foreign key join serdes may still not be initialized leading to the following 
> NPE:
> {noformat}
> Encountered the following error during 
> processing:java.lang.NullPointerException: null
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
>   at 
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}
> This happens because the processors for foreign key joins will be distributed 
> across multiple tasks. The serde will only be initialized with the default 
> serde during the initialization of the task containing the sink node 
> ("subscription-registration-sink"). So if the task containing the 
> SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to 
> the same instance as the task containing the sink node, a NPE