[ 
https://issues.apache.org/jira/browse/KAFKA-9518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17032663#comment-17032663
 ] 

Matthias J. Sax commented on KAFKA-9518:
----------------------------------------

Thanks for digging out the other Jira [~vvcephei] – can we properly link all of 
those as "related" so it shows up on top? It gets lost easily in comments. 
Still unclear to me, to what extent we can fix it though...

> NullPointerException on out-of-order topologies
> -----------------------------------------------
>
>                 Key: KAFKA-9518
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9518
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.1, 2.4.0, 2.3.1
>            Reporter: Murilo Tavares
>            Priority: Minor
>         Attachments: kafka-streams-testing.zip
>
>
> I have a KafkaStreams that dinamically builds a topology based on a Map of 
> input-to-output topics. Since the map was not sorted, iteration was 
> unpredictable, and different instances could have different orders. When this 
> happen, KafkaStreams throws an exception during REBALANCE.
>  
> I was able to reproduce this using the attached java project. The project is 
> a pretty simple Maven project with one class. It starts 2 instances in 
> parallel, with the same input-to-output topics, but one instance takes the 
> topics in a reversed order.
>  
> The exception is this:
> {noformat}
> Exception in thread 
> "MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: stream-thread 
> [MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1] Failed to 
> rebalance.
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:234)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:176)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
> ... 3 more{noformat}
>  
> The topology for both instances:
> {code:java}
> // instance1
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [topicA])
>       --> KSTREAM-SINK-0000000001
>     Sink: KSTREAM-SINK-0000000001 (topic: topicA-repartitioned)
>       <-- KSTREAM-SOURCE-0000000000
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-0000000002 (topics: [topicB])
>       --> KSTREAM-SINK-0000000003
>     Sink: KSTREAM-SINK-0000000003 (topic: topicB-repartitioned)
>       <-- KSTREAM-SOURCE-0000000002
> // instance2
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [topicB])
>       --> KSTREAM-SINK-0000000001
>     Sink: KSTREAM-SINK-0000000001 (topic: topicB-repartitioned)
>       <-- KSTREAM-SOURCE-0000000000
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-0000000002 (topics: [topicA])
>       --> KSTREAM-SINK-0000000003
>     Sink: KSTREAM-SINK-0000000003 (topic: topicA-repartitioned)
>       <-- KSTREAM-SOURCE-0000000002{code}
> In my actual project, I fixed the issue by sorting the topics map 
> accordingly, but it would be nice to have at least a better error handling in 
> this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to