[ https://issues.apache.org/jira/browse/SAMZA-920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yi Pan (Data Infrastructure) updated SAMZA-920: ----------------------------------------------- Assignee: Ivan Simoneko > BrokerProxy.abdicateAll can get stuck on adding and removing the same > partitions infinitely > ------------------------------------------------------------------------------------------- > > Key: SAMZA-920 > URL: https://issues.apache.org/jira/browse/SAMZA-920 > Project: Samza > Issue Type: Bug > Components: kafka > Affects Versions: 0.9.1, 0.10.1 > Reporter: Ivan Simoneko > Assignee: Ivan Simoneko > Labels: easyfix > Attachments: SAMZA-920_v1.patch > > > abdicateAll is iterating over ConcurrentHashMap nextOffsets, removing and > adding back elements which can result in an infinite iteration. > {code} > 2016-03-20 20:25:41,413 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.KafkaSystemConsumer - Abdicating for > [<TOPIC_PARTITION_1>]. > 2016-03-20 20:25:41,414 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.KafkaSystemConsumer - Refreshing brokers for: > Map([<TOPIC_PARTITION_1>] -> 20749911) > 2016-03-20 20:25:41,414 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Adding new topic and partition > [<TOPIC_PARTITION_1>] to queue for <HOST> > 2016-03-20 20:25:41,414 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.GetOffset - Validating offset 20749911 for > topic and partition [<TOPIC_PARTITION_1>] > 2016-03-20 20:25:41,428 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.GetOffset - Able to successfully read from > offset 20749911 for topic and partition [<TOPIC_PARTITION_1>]. Using it to > instantiate consumer. > 2016-03-20 20:25:41,428 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Got offset 20749911 for new topic > and partition [<TOPIC_PARTITION_1>]. > 2016-03-20 20:25:41,428 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Tried to start an already started > broker proxy (BrokerProxy for <HOST>:<PORT>). Ignoring. > 2016-03-20 20:25:41,429 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.KafkaSystemConsumer - Claimed topic-partition > ([<TOPIC_PARTITION_1>]) for (BrokerProxy for <HOST>:<PORT>) > 2016-03-20 20:25:41,429 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Removed [<TOPIC_PARTITION_2>] > 2016-03-20 20:25:41,429 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.KafkaSystemConsumer - Abdicating for > [<TOPIC_PARTITION_2>]. > 2016-03-20 20:25:41,429 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.KafkaSystemConsumer - Refreshing brokers for: > Map([<TOPIC_PARTITION_2>] -> 20749909) > 2016-03-20 20:25:41,429 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Adding new topic and partition > [<TOPIC_PARTITION_2>] to queue for <HOST> > 2016-03-20 20:25:41,429 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.GetOffset - Validating offset 20749909 for > topic and partition [<TOPIC_PARTITION_2>] > 2016-03-20 20:25:41,444 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.GetOffset - Able to successfully read from > offset 20749909 for topic and partition [<TOPIC_PARTITION_2>]. Using it to > instantiate consumer. > 2016-03-20 20:25:41,444 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Got offset 20749909 for new topic > and partition [<TOPIC_PARTITION_2>]. > 2016-03-20 20:25:41,444 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Tried to start an already started > broker proxy (BrokerProxy for <HOST>:<PORT>). Ignoring. > 2016-03-20 20:25:41,444 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.KafkaSystemConsumer - Claimed topic-partition > ([<TOPIC_PARTITION_2>]) for (BrokerProxy for <HOST>:<PORT>) > 2016-03-20 20:25:41,444 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Removed [<TOPIC_PARTITION_1>] > 2016-03-20 20:25:41,444 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.KafkaSystemConsumer - Abdicating for > [<TOPIC_PARTITION_1>]. > 2016-03-20 20:25:41,444 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.KafkaSystemConsumer - Refreshing brokers for: > Map([<TOPIC_PARTITION_1>] -> 20749911) > 2016-03-20 20:25:41,445 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Adding new topic and partition > [<TOPIC_PARTITION_1>] to queue for <HOST> > 2016-03-20 20:25:41,445 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.GetOffset - Validating offset 20749911 for > topic and partition [<TOPIC_PARTITION_1>] > 2016-03-20 20:25:41,460 INFO [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.GetOffset - Able to successfully read from > offset 20749911 for topic and partition [<TOPIC_PARTITION_1>]. Using it to > instantiate consumer. > 2016-03-20 20:25:41,460 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Got offset 20749911 for new topic > and partition [<TOPIC_PARTITION_1>]. > 2016-03-20 20:25:41,460 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.BrokerProxy - Tried to start an already started > broker proxy (BrokerProxy for <HOST>:<PORT>). Ignoring. > 2016-03-20 20:25:41,460 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at <HOST>:<PORT> for client <CLIENT_ID>] > org.apache.samza.system.kafka.KafkaSystemConsumer - Claimed topic-partition > ([<TOPIC_PARTITION_1>]) for (BrokerProxy for <HOST>:<PORT>) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)