[ 
https://issues.apache.org/jira/browse/SAMZA-590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated SAMZA-590:
----------------------------------
    Attachment: SAMZA-590-0.patch

Attaching patch. RB at:

https://reviews.apache.org/r/31909

# Move "abdicate" method to be a top-level method within the class.
# Added an abdicateAll method that abdicates all TopicAndPartitions.
# Invoke abdicateAll whenever a consumer failure occurs.
# Wrote a unit test to validate that the BrokerProxy relinquishes ownership for 
all TopicAndPartitions on failure, and that it continues to refresh dropped 
TopicAndPartitions (in case its broker comes alive again) afterwards.

Also, ran this code in one of our jobs on our unstable cluster (that we're 
bouncing as described in description), and found that the BrokerProxy properly 
relinquished ownership.

{noformat}
2015-03-10 17:42:31 DefaultFetchSimpleConsumer [INFO] Reconnect due to socket 
error: java.io.EOFException: Received -1 when reading from channel, socket has 
likely been closed.
2015-03-10 17:42:31 BrokerProxy [WARN] Restarting consumer due to 
java.nio.channels.ClosedChannelException. Releasing ownership of all 
partitions, and restarting consumer. Turn on debugging to get a full stack 
trace.
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Abdicating for [topic1,14]
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 VerifiableProperties [INFO] Verifying properties
2015-03-10 17:42:31 VerifiableProperties [INFO] Property client.id is 
overridden to samza_consumer-job_name-i001-1425999995883-2
2015-03-10 17:42:31 VerifiableProperties [INFO] Property metadata.broker.list 
is overridden to broker-vip:10251
2015-03-10 17:42:31 VerifiableProperties [INFO] Property request.timeout.ms is 
overridden to 60000
2015-03-10 17:42:31 ClientUtils$ [INFO] Fetching metadata from broker 
id:0,host:broker-vip,port:10251 with correlation id 1 for 1 topic(s) Set(topic1)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 SyncProducer [INFO] Connected to broker-vip:10251 for 
producing
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 SyncProducer [INFO] Disconnecting from broker-vip:10251
2015-03-10 17:42:31 BrokerProxy [INFO] Creating new SimpleConsumer for host 
broker1:10251 for system kafka
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 GetOffset [INFO] Validating offset 18198544 for topic and 
partition [topic1,14]
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:31 KafkaSystemConsumer [INFO] Refreshing brokers for: 
Map([topic1,14] -> 18198544)
2015-03-10 17:42:32 GetOffset [INFO] Able to successfully read from offset 
18198544 for topic and partition [topic1,14]. Using it to instantiate consumer.
2015-03-10 17:42:32 BrokerProxy [INFO] Starting BrokerProxy for broker1:10251
2015-03-10 17:42:32 BrokerProxy [INFO] Creating new SimpleConsumer for host 
broker2:10251 for system kafka
{noformat}

> Dead Kafka broker ignores new leader
> ------------------------------------
>
>                 Key: SAMZA-590
>                 URL: https://issues.apache.org/jira/browse/SAMZA-590
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.9.0
>
>         Attachments: SAMZA-590-0.patch
>
>
> We recently discovered a bug in BrokerProxy, where the fetcher thread will 
> continue trying to fetch from a dead leader, even if a new leader has been 
> elected.
> We were doing a slow rolling bounce of one of our Kafka clusters, where we 
> take a broker offline, do extended maintenance on the machine, then bring the 
> broker back up. We observed that when the broker was moved offline, and the 
> other brokers took over leadership for its partitions, the BrokerProxy thread 
> never saw this update.
> The containers logged this every 10s:
> {noformat}
> 2015-03-09 19:14:19 BrokerProxy [WARN] Restarting consumer due to 
> java.nio.channels.ClosedChannelException. Turn on debugging to get a full 
> stack trace.
> 2015-03-09 19:14:19 BrokerProxy [DEBUG] Exception detail:
> java.nio.channels.ClosedChannelException
>       at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>       at 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
>       at 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
>       at 
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:174)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:145)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:132)
>       at 
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:131)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The problem appears to be that the BrokerProxy thread never lets go of 
> ownership of its TopicAndPartitions when a consumer failure occurs. It just 
> tries to reconnect and fetch again. If the broker is down for a while, this 
> results in the thread owning TopicAndPartitions that are now lead by other 
> brokers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to