[ https://issues.apache.org/jira/browse/STORM-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16013239#comment-16013239 ]
Srishty Agrawal edited comment on STORM-2514 at 5/16/17 11:08 PM: ------------------------------------------------------------------ Thanks for the clarification. Please find answers to your questions inline: *How did you get the task lists you posted?* {noformat} Task IDs ------- 3, 4, 7, 8, 9, 11, 15, 18 ------------ Partitions 0, 1, 2, 3 Task IDs ------- 5, 6, 10, 12, 13, 14, 16, 17 --------- Partition 4, 5, 6, 7 {noformat} The above table has been derived from the worker logs. For instance, if you search for {{kafkaspout 3}}, which means Task ID 3, you will find results for messages which are being read from partitions 0, 1, 2 and 3. This table seems to convey that {noformat} 5, 6, 10, 12, 13, 14, 16, 17 correspond to executor1 and 3, 4, 7, 8, 9, 11, 15, 18 correspond to executor2 {noformat} If I assume that Executor1 should only be running tasks 3, 4, 5, 6, 7, 8, 9, 10, then only these tasks should be reading from partitions 4, 5, 6 or 7. *I don't think it's exactly right that all the tasks in executor1 (3 10) will be reading from topics 4,6,5,7.* Sorry for the confusion, I meant only tasks in Executor1 should read from the partitions 4, 5, 6 or 7. I have re-worded the same in the description above. I agree that there is a possibility that not all the tasks will end up reading from partitions 4, 5, 6 and 7. *Each task has its own spout instance (and corresponding KafkaConsumer), so only task 10 is actually assigned those partitions at the start of the log. I'm not sure how task 3 is even emitting anything?* Shouldn’t 3 more tasks (from Executor1) apart from task ID 10 be assigned to read from partitions 4, 5, 6 and 7. If there are more tasks than the number of partitions there should be one to one mapping between tasks and partitions is what I remember reading in the docs. Hence it is not surprising for me that task 3 is reading from a Kafka partition, although it does not seem to read from the assigned Kafka partition (according to the {{Setting newly assigned}} log line ). *I also noticed that many tasks are emitting what appears to be the same message?* {noformat} e.g. 2017-05-03 13:50:47.655 o.a.s.d.task Thread-8-kafkaspout-executor[11 18] [INFO] Emitting: kafkaspout 18 default [8topic, 2, 0, null, 1] 2017-05-03 13:50:47.658 o.a.s.d.task Thread-8-kafkaspout-executor[11 18] [INFO] Emitting: kafkaspout 11 default [8topic, 2, 0, null, 1] 2017-05-03 13:50:47.660 o.a.s.d.task Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 7 default [8topic, 2, 0, null, 1] 2017-05-03 13:50:47.670 o.a.s.d.task Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 8 default [8topic, 2, 0, null, 1] {noformat} Are you deriving this information from the fact that all of the above messages have the same message ID (0) ? *Is the start of the log the first occurrence of partition assignments in the log?* No, I have randomly taken a segment from the worker logs. I am attaching the full worker.log file in this ticket. *Are you using automatic or manual subscription to Kafka?* I did not understand the question. I am running Kafka on my local machine and using a KafkaSpout to read a topic from this local instance of Kafka. was (Author: srishtyagraw...@gmail.com): Thanks for the clarification. Please find answers to your questions inline: *How did you get the task lists you posted?* {noformat} Task IDs ------- 3, 4, 7, 8, 9, 11, 15, 18 ------------ Partitions 0, 1, 2, 3 Task IDs ------- 5, 6, 10, 12, 13, 14, 16, 17 --------- Partition 4, 5, 6, 7 {noformat} The above table has been derived from the worker logs. For instance, if you search for {{kafkaspout 3}}, which means Task ID 3, you will find results for messages which are being read from partitions 0, 1, 2 and 3. This table seems to convey that {noformat} 5, 6, 10, 12, 13, 14, 16, 17 correspond to executor1 and 3, 4, 7, 8, 9, 11, 15, 18 correspond to executor2 {noformat} If I assume that Executor1 should only be running tasks 3, 4, 5, 6, 7, 8, 9, 10, then only these tasks should be reading from partitions 4, 5, 6 or 7. *I don't think it's exactly right that all the tasks in executor1 (3 10) will be reading from topics 4,6,5,7. * Sorry for the confusion, I meant only tasks in Executor1 should read from the partitions 4, 5, 6 or 7. I have re-worded the same in the description above. I agree that there is a possibility that not all the tasks will end up reading from partitions 4, 5, 6 and 7. *Each task has its own spout instance (and corresponding KafkaConsumer), so only task 10 is actually assigned those partitions at the start of the log. I'm not sure how task 3 is even emitting anything?* Shouldn’t 3 more tasks (from Executor1) apart from task ID 10 be assigned to read from partitions 4, 5, 6 and 7. If there are more tasks than the number of partitions there should be one to one mapping between tasks and partitions is what I remember reading in the docs. Hence it is not surprising for me that task 3 is reading from a Kafka partition, although it does not seem to read from the assigned Kafka partition (according to the {{Setting newly assigned}} log line ). *I also noticed that many tasks are emitting what appears to be the same message?* {noformat} e.g. 2017-05-03 13:50:47.655 o.a.s.d.task Thread-8-kafkaspout-executor[11 18] [INFO] Emitting: kafkaspout 18 default [8topic, 2, 0, null, 1] 2017-05-03 13:50:47.658 o.a.s.d.task Thread-8-kafkaspout-executor[11 18] [INFO] Emitting: kafkaspout 11 default [8topic, 2, 0, null, 1] 2017-05-03 13:50:47.660 o.a.s.d.task Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 7 default [8topic, 2, 0, null, 1] 2017-05-03 13:50:47.670 o.a.s.d.task Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 8 default [8topic, 2, 0, null, 1] {noformat} Are you deriving this information from the fact that all of the above messages have the same message ID (0) ? *Is the start of the log the first occurrence of partition assignments in the log?* No, I have randomly taken a segment from the worker logs. I am attaching the full worker.log file in this ticket. *Are you using automatic or manual subscription to Kafka?* I did not understand the question. I am running Kafka on my local machine and using a KafkaSpout to read a topic from this local instance of Kafka. > Incorrect logs for mapping between Kafka partitions and task IDs > ---------------------------------------------------------------- > > Key: STORM-2514 > URL: https://issues.apache.org/jira/browse/STORM-2514 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client > Reporter: Srishty Agrawal > Attachments: worker.log > > > While working on > [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506], the worker > logs were generated with debug mode on. The information printed about mapping > between Task IDs and kafka partitions was contradictory to my assumptions. I > ran a topology which used KafkaSpout from the storm-kafka-client module, it > had a parallelism hint of 2 (number of executors) and a total of 16 tasks. > The log lines mentioned below show assigned mapping between executors and > kafka partitions: > {noformat} > o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO] > Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7] > for group kafkaSpoutTestGroup > o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions > reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce, > topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]] > o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO] > Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0] > for group kafkaSpoutTestGroup > o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions > reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126, > topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]] > {noformat} > It is evident that only tasks (with ID 3, 4, 5, 6, 7, 8, 9, 10) in Executor1 > (3 10) will be reading from kafka partitions 4, 5, 6 and 7. Similarly, tasks > in Executor2 (11 18) will be reading from kafka partitions 0, 1, 2 and 3. > These log lines are being printed by Tasks with IDs 10 and 15 in respective > executors. > Logs which emit individual messages do not abide by the above assumption. For > example in the log mentioned below, Task ID 3 (added code, as a part of > debugging STORM-2506, to print the Task ID right next to component ID) which > runs on Executor1 reads from partition 2 (the second value inside the square > brackets), instead of 4, 5, 6 or 7. > {noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3 > default [8topic, 2, 0, null, 1]{noformat} > This behavior has been summarized in the table below : > {noformat} > Task IDs ------- 3, 4, 7, 8, 9, 11, 15, 18 ------------ Partitions 0, 1, 2, 3 > Task IDs ------- 5, 6, 10, 12, 13, 14, 16, 17 --------- Partition 4, 5, 6, 7 > {noformat} > [You can find the relevant parts of log file > here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351] > > Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16, > 17}} correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to > executor2? Are (3 10) not the starting and ending task IDs in Executor1? > Another interesting thing to note is that, Task IDs 10 and 15 are always > reading from the partitions they claimed to be reading from (while setting > partition assignments). > If my assumptions are correct, there is a bug in the way the mapping > information is being/passed to worker logs. If not, we need to make changes > in our docs. -- This message was sent by Atlassian JIRA (v6.3.15#6346)