[ https://issues.apache.org/jira/browse/STORM-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Srishty Agrawal updated STORM-2514: ----------------------------------- Description: While working on [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506], the worker logs were generated with debug mode on. The information printed about mapping between Task IDs and kafka partitions was contradictory to my assumptions. I ran a topology which used KafkaSpout from the storm-kafka-client module, it had a parallelism hint of 2 (number of executors) and a total of 16 tasks. The log lines mentioned below show assigned mapping between executors and kafka partitions: {noformat} o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO] Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7] for group kafkaSpoutTestGroup o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup, consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce, topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]] o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO] Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0] for group kafkaSpoutTestGroup o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup, consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126, topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]] {noformat} It is evident that only tasks (with ID 3, 4, 5, 6, 7, 8, 9, 10) in Executor1 (3 10) will be reading from kafka partitions 4, 5, 6 and 7. Similarly, tasks in Executor2 (11 18) will be reading from kafka partitions 0, 1, 2 and 3. These log lines are being printed by Tasks with IDs 10 and 15 in respective executors. Logs which emit individual messages do not abide by the above assumption. For example in the log mentioned below, Task ID 3 (added code, as a part of debugging STORM-2506, to print the Task ID right next to component ID) which runs on Executor1 reads from partition 2 (the second value inside the square brackets), instead of 4, 5, 6 or 7. {noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3 default [8topic, 2, 0, null, 1]{noformat} This behavior has been summarized in the table below : {noformat} Task IDs ------- 3, 4, 7, 8, 9, 11, 15, 18 ------------ Partitions 0, 1, 2, 3 Task IDs ------- 5, 6, 10, 12, 13, 14, 16, 17 --------- Partition 4, 5, 6, 7 {noformat} [You can find the relevant parts of log file here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351] Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16, 17}} correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to executor2? Are (3 10) not the starting and ending task IDs in Executor1? Another interesting thing to note is that, Task IDs 10 and 15 are always reading from the partitions they claimed to be reading from (while setting partition assignments). If my assumptions are correct, there is a bug in the way the mapping information is being/passed to worker logs. If not, we need to make changes in our docs. was: While working on [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506], the worker logs were generated with debug mode on. The information printed about mapping between Task IDs and kafka partitions was contradictory to my assumptions. I ran a topology which used KafkaSpout from the storm-kafka-client module, it had a parallelism hint of 2 (number of executors) and a total of 16 tasks. The log lines mentioned below show assigned mapping between executors and kafka partitions: {noformat} o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO] Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7] for group kafkaSpoutTestGroup o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup, consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce, topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]] o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO] Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0] for group kafkaSpoutTestGroup o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup, consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126, topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]] {noformat} It is evident that all the tasks in Executor1 (3 10) will be reading from kafka partitions 4, 5, 6 and 7. Similarly, tasks in Executor2 (11 18) will be reading from kafka partitions 0, 1, 2 and 3. These log lines are being printed by Tasks with IDs 10 and 15 in respective executors. Logs which emit individual messages do not abide by the above assumption. For example in the log mentioned below, Task ID 3 (added code, as a part of debugging STORM-2506, to print the Task ID right next to component ID) which runs on Executor1 reads from partition 2 (the second value inside the square brackets), instead of 4, 5, 6 or 7. {noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3 default [8topic, 2, 0, null, 1]{noformat} This behavior has been summarized in the table below : {noformat} Task IDs ------- 3, 4, 7, 8, 9, 11, 15, 18 ------------ Partitions 0, 1, 2, 3 Task IDs ------- 5, 6, 10, 12, 13, 14, 16, 17 --------- Partition 4, 5, 6, 7 {noformat} [You can find the relevant parts of log file here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351] Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16, 17}} correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to executor2? Are (3 10) not the starting and ending task IDs in Executor1? Another interesting thing to note is that, Task IDs 10 and 15 are always reading from the partitions they claimed to be reading from (while setting partition assignments). If my assumptions are correct, there is a bug in the way the mapping information is being/passed to worker logs. If not, we need to make changes in our docs. > Incorrect logs for mapping between Kafka partitions and task IDs > ---------------------------------------------------------------- > > Key: STORM-2514 > URL: https://issues.apache.org/jira/browse/STORM-2514 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client > Reporter: Srishty Agrawal > > While working on > [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506], the worker > logs were generated with debug mode on. The information printed about mapping > between Task IDs and kafka partitions was contradictory to my assumptions. I > ran a topology which used KafkaSpout from the storm-kafka-client module, it > had a parallelism hint of 2 (number of executors) and a total of 16 tasks. > The log lines mentioned below show assigned mapping between executors and > kafka partitions: > {noformat} > o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO] > Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7] > for group kafkaSpoutTestGroup > o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions > reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce, > topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]] > o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO] > Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0] > for group kafkaSpoutTestGroup > o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions > reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126, > topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]] > {noformat} > It is evident that only tasks (with ID 3, 4, 5, 6, 7, 8, 9, 10) in Executor1 > (3 10) will be reading from kafka partitions 4, 5, 6 and 7. Similarly, tasks > in Executor2 (11 18) will be reading from kafka partitions 0, 1, 2 and 3. > These log lines are being printed by Tasks with IDs 10 and 15 in respective > executors. > Logs which emit individual messages do not abide by the above assumption. For > example in the log mentioned below, Task ID 3 (added code, as a part of > debugging STORM-2506, to print the Task ID right next to component ID) which > runs on Executor1 reads from partition 2 (the second value inside the square > brackets), instead of 4, 5, 6 or 7. > {noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3 > default [8topic, 2, 0, null, 1]{noformat} > This behavior has been summarized in the table below : > {noformat} > Task IDs ------- 3, 4, 7, 8, 9, 11, 15, 18 ------------ Partitions 0, 1, 2, 3 > Task IDs ------- 5, 6, 10, 12, 13, 14, 16, 17 --------- Partition 4, 5, 6, 7 > {noformat} > [You can find the relevant parts of log file > here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351] > > Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16, > 17}} correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to > executor2? Are (3 10) not the starting and ending task IDs in Executor1? > Another interesting thing to note is that, Task IDs 10 and 15 are always > reading from the partitions they claimed to be reading from (while setting > partition assignments). > If my assumptions are correct, there is a bug in the way the mapping > information is being/passed to worker logs. If not, we need to make changes > in our docs. -- This message was sent by Atlassian JIRA (v6.3.15#6346)