Re: Kafka 0.9.0.1 partitions shrink and expand frequently after restart the broker

2017-11-19 Thread Json Tu
someone can help to analysis it?

> 在 2017年11月10日,上午11:08,Json Tu  写道:
> 
> I‘m so sorry for my poor english.
> 
> what I really means is my broker machine is configured as 8 core 16G. but my 
> jvm configure is as below.
> java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
> -Djava.awt.headless=true -Xloggc:/xx/yy/kafkaServer-gc.log -verbose:gc 
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=32 -XX:GCLogFileSize=10M 
> -XX:+HeapDumpOnOutOfMemoryError.
> 
> we have 30+ clusters with this jvm configure, and are deployed on the machine 
> which configured as 8 core 16G. compare to other clusters, the current 
> cluster have more than 5 times partitions than other clusters.
> when we restart other clusters,  there is no such phenomenon.
> 
> may be some metrics or logs can leads to find root cause of this phenomenon.
> Looking forward to more suggestions.
> 
> 
>> 在 2017年11月9日,下午9:59,John Yost  写道:
>> 
>> I've seen this before and it was due to long GC pauses due in large part to
>> a memory heap > 8 GB.
>> 
>> --John
>> 
>> On Thu, Nov 9, 2017 at 8:17 AM, Json Tu  wrote:
>> 
>>> Hi,
>>>   we have a kafka cluster which is made of 6 brokers,  with 8 cpu and
>>> 16G memory on each broker’s machine, and we have about 1600 topics in the
>>> cluster,about 1700 partitions’ leader and 1600 partitions' replica on each
>>> broker.
>>>   when we restart a normal broke,  we find that there are 500+
>>> partitions shrink and expand frequently when restart the broker,
>>> there are many logs as below.
>>> 
>>>  [2017-11-09 17:05:51,173] INFO Partition [Yelp,5] on broker 4759726:
>>> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
>>> (kafka.cluster.Partition)
>>> [2017-11-09 17:06:22,047] INFO Partition [Yelp,5] on broker 4759726:
>>> Shrinking ISR for partition [Yelp,5] from 4759726,4759750 to 4759726
>>> (kafka.cluster.Partition)
>>> [2017-11-09 17:06:28,634] INFO Partition [Yelp,5] on broker 4759726:
>>> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
>>> (kafka.cluster.Partition)
>>> [2017-11-09 17:06:44,658] INFO Partition [Yelp,5] on broker 4759726:
>>> Shrinking ISR for partition [Yelp,5] from 4759726,4759750 to 4759726
>>> (kafka.cluster.Partition)
>>> [2017-11-09 17:06:47,611] INFO Partition [Yelp,5] on broker 4759726:
>>> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
>>> (kafka.cluster.Partition)
>>> [2017-11-09 17:07:19,703] INFO Partition [Yelp,5] on broker 4759726:
>>> Shrinking ISR for partition [Yelp,5] from 4759726,4759750 to 4759726
>>> (kafka.cluster.Partition)
>>> [2017-11-09 17:07:26,811] INFO Partition [Yelp,5] on broker 4759726:
>>> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
>>> (kafka.cluster.Partition)
>>> …
>>> 
>>> 
>>>   and repeat shrink and expand after 30 minutes which is the default
>>> value of leader.imbalance.check.interval.seconds, and at that time
>>> we can find the log of controller’s auto rebalance,which can leads some
>>> partition’s leader change to this restarted broker.
>>>   we have no shrink and expand when our cluster is running except when
>>> we restart it,so replica.fetch.thread.num is 1,and it seems enough.
>>> 
>>>   we can reproduce it at each restart,can someone give some suggestions.
>>> thanks before.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
> 



Re: Kafka 0.9.0.1 partitions shrink and expand frequently after restart the broker

2017-11-09 Thread Json Tu
The broker with broker id 4759750 is just restart,and there are 500+ replica 
partitions shrink and expand frequently,and there leader partition is 
distributed in the other 5 brokers. the log is pulled from one broker,and 
extract logs related to 1 partition.

> 在 2017年11月10日,下午12:06,Hu Xi  写道:
> 
> Seems broker `4759750` was always removed for partition [Yelp, 5] every round 
> of ISR shrinking. Did you check if everything works alright for this broker?
> 
> 
> 发件人: Json Tu 
> 发送时间: 2017年11月10日 11:08
> 收件人: users@kafka.apache.org
> 抄送: d...@kafka.apache.org; Guozhang Wang
> 主题: Re: Kafka 0.9.0.1 partitions shrink and expand frequently after restart 
> the broker
>  
> I‘m so sorry for my poor english.
> 
> what I really means is my broker machine is configured as 8 core 16G. but my 
> jvm configure is as below.
> java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
> -Djava.awt.headless=true -Xloggc:/xx/yy/kafkaServer-gc.log -verbose:gc 
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=32 -XX:GCLogFileSize=10M 
> -XX:+HeapDumpOnOutOfMemoryError.
> 
> we have 30+ clusters with this jvm configure, and are deployed on the machine 
> which configured as 8 core 16G. compare to other clusters, the current 
> cluster have more than 5 times partitions than other clusters.
> when we restart other clusters,  there is no such phenomenon.
> 
> may be some metrics or logs can leads to find root cause of this phenomenon.
> Looking forward to more suggestions.
> 
> 
> > 在 2017年11月9日,下午9:59,John Yost  写道:
> > 
> > I've seen this before and it was due to long GC pauses due in large part to
> > a memory heap > 8 GB.
> > 
> > --John
> > 
> > On Thu, Nov 9, 2017 at 8:17 AM, Json Tu  wrote:
> > 
> >> Hi,
> >>we have a kafka cluster which is made of 6 brokers,  with 8 cpu and
> >> 16G memory on each broker’s machine, and we have about 1600 topics in the
> >> cluster,about 1700 partitions’ leader and 1600 partitions' replica on each
> >> broker.
> >>when we restart a normal broke,  we find that there are 500+
> >> partitions shrink and expand frequently when restart the broker,
> >> there are many logs as below.
> >> 
> >>   [2017-11-09 17:05:51,173] INFO Partition [Yelp,5] on broker 4759726:
> >> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
> >> (kafka.cluster.Partition)
> >> [2017-11-09 17:06:22,047] INFO Partition [Yelp,5] on broker 4759726:
> >> Shrinking ISR for partition [Yelp,5] from 4759726,4759750 to 4759726
> >> (kafka.cluster.Partition)
> >> [2017-11-09 17:06:28,634] INFO Partition [Yelp,5] on broker 4759726:
> >> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
> >> (kafka.cluster.Partition)
> >> [2017-11-09 17:06:44,658] INFO Partition [Yelp,5] on broker 4759726:
> >> Shrinking ISR for partition [Yelp,5] from 4759726,4759750 to 4759726
> >> (kafka.cluster.Partition)
> >> [2017-11-09 17:06:47,611] INFO Partition [Yelp,5] on broker 4759726:
> >> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
> >> (kafka.cluster.Partition)
> >> [2017-11-09 17:07:19,703] INFO Partition [Yelp,5] on broker 4759726:
> >> Shrinking ISR for partition [Yelp,5] from 4759726,4759750 to 4759726
> >> (kafka.cluster.Partition)
> >> [2017-11-09 17:07:26,811] INFO Partition [Yelp,5] on broker 4759726:
> >> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
> >> (kafka.cluster.Partition)
> >> …
> >> 
> >> 
> >>and repeat shrink and expand after 30 minutes which is the default
> >> value of leader.imbalance.check.interval.seconds, and at that time
> >> we can find the log of controller’s auto rebalance,which can leads some
> >> partition’s leader change to this restarted broker.
> >>we have no shrink and expand when our cluster is running except when
> >> we restart it,so replica.fetch.thread.num is 1,and it seems enough.
> >> 
> >>we can reproduce it at each restart,can someone give some suggestions.
> >> thanks before.
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 



Re: Kafka 0.9.0.1 partitions shrink and expand frequently after restart the broker

2017-11-09 Thread Json Tu
I‘m so sorry for my poor english.

what I really means is my broker machine is configured as 8 core 16G. but my 
jvm configure is as below.
java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
-XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
-Djava.awt.headless=true -Xloggc:/xx/yy/kafkaServer-gc.log -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=32 -XX:GCLogFileSize=10M 
-XX:+HeapDumpOnOutOfMemoryError.

we have 30+ clusters with this jvm configure, and are deployed on the machine 
which configured as 8 core 16G. compare to other clusters, the current cluster 
have more than 5 times partitions than other clusters.
when we restart other clusters,  there is no such phenomenon.

may be some metrics or logs can leads to find root cause of this phenomenon.
Looking forward to more suggestions.


> 在 2017年11月9日,下午9:59,John Yost  写道:
> 
> I've seen this before and it was due to long GC pauses due in large part to
> a memory heap > 8 GB.
> 
> --John
> 
> On Thu, Nov 9, 2017 at 8:17 AM, Json Tu  wrote:
> 
>> Hi,
>>we have a kafka cluster which is made of 6 brokers,  with 8 cpu and
>> 16G memory on each broker’s machine, and we have about 1600 topics in the
>> cluster,about 1700 partitions’ leader and 1600 partitions' replica on each
>> broker.
>>when we restart a normal broke,  we find that there are 500+
>> partitions shrink and expand frequently when restart the broker,
>> there are many logs as below.
>> 
>>   [2017-11-09 17:05:51,173] INFO Partition [Yelp,5] on broker 4759726:
>> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
>> (kafka.cluster.Partition)
>> [2017-11-09 17:06:22,047] INFO Partition [Yelp,5] on broker 4759726:
>> Shrinking ISR for partition [Yelp,5] from 4759726,4759750 to 4759726
>> (kafka.cluster.Partition)
>> [2017-11-09 17:06:28,634] INFO Partition [Yelp,5] on broker 4759726:
>> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
>> (kafka.cluster.Partition)
>> [2017-11-09 17:06:44,658] INFO Partition [Yelp,5] on broker 4759726:
>> Shrinking ISR for partition [Yelp,5] from 4759726,4759750 to 4759726
>> (kafka.cluster.Partition)
>> [2017-11-09 17:06:47,611] INFO Partition [Yelp,5] on broker 4759726:
>> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
>> (kafka.cluster.Partition)
>> [2017-11-09 17:07:19,703] INFO Partition [Yelp,5] on broker 4759726:
>> Shrinking ISR for partition [Yelp,5] from 4759726,4759750 to 4759726
>> (kafka.cluster.Partition)
>> [2017-11-09 17:07:26,811] INFO Partition [Yelp,5] on broker 4759726:
>> Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750
>> (kafka.cluster.Partition)
>> …
>> 
>> 
>>and repeat shrink and expand after 30 minutes which is the default
>> value of leader.imbalance.check.interval.seconds, and at that time
>> we can find the log of controller’s auto rebalance,which can leads some
>> partition’s leader change to this restarted broker.
>>we have no shrink and expand when our cluster is running except when
>> we restart it,so replica.fetch.thread.num is 1,and it seems enough.
>> 
>>we can reproduce it at each restart,can someone give some suggestions.
>> thanks before.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 



Kafka 0.9.0.1 partitions shrink and expand frequently after restart the broker

2017-11-09 Thread Json Tu
Hi,
we have a kafka cluster which is made of 6 brokers,  with 8 cpu and 16G 
memory on each broker’s machine, and we have about 1600 topics in the 
cluster,about 1700 partitions’ leader and 1600 partitions' replica on each 
broker.
when we restart a normal broke,  we find that there are 500+ partitions 
shrink and expand frequently when restart the broker,
there are many logs as below.

   [2017-11-09 17:05:51,173] INFO Partition [Yelp,5] on broker 4759726: 
Expanding ISR for partition [Yelp,5] from 4759726 to 4759726,4759750 
(kafka.cluster.Partition)
[2017-11-09 17:06:22,047] INFO Partition [Yelp,5] on broker 4759726: Shrinking 
ISR for partition [Yelp,5] from 4759726,4759750 to 4759726 
(kafka.cluster.Partition)
[2017-11-09 17:06:28,634] INFO Partition [Yelp,5] on broker 4759726: Expanding 
ISR for partition [Yelp,5] from 4759726 to 4759726,4759750 
(kafka.cluster.Partition)
[2017-11-09 17:06:44,658] INFO Partition [Yelp,5] on broker 4759726: Shrinking 
ISR for partition [Yelp,5] from 4759726,4759750 to 4759726 
(kafka.cluster.Partition)
[2017-11-09 17:06:47,611] INFO Partition [Yelp,5] on broker 4759726: Expanding 
ISR for partition [Yelp,5] from 4759726 to 4759726,4759750 
(kafka.cluster.Partition)
[2017-11-09 17:07:19,703] INFO Partition [Yelp,5] on broker 4759726: Shrinking 
ISR for partition [Yelp,5] from 4759726,4759750 to 4759726 
(kafka.cluster.Partition)
[2017-11-09 17:07:26,811] INFO Partition [Yelp,5] on broker 4759726: Expanding 
ISR for partition [Yelp,5] from 4759726 to 4759726,4759750 
(kafka.cluster.Partition)
…


and repeat shrink and expand after 30 minutes which is the default value of 
leader.imbalance.check.interval.seconds, and at that time
we can find the log of controller’s auto rebalance,which can leads some 
partition’s leader change to this restarted broker.
we have no shrink and expand when our cluster is running except when we 
restart it,so replica.fetch.thread.num is 1,and it seems enough.

we can reproduce it at each restart,can someone give some suggestions. 
thanks before.









Get Broker metrics timeout

2017-10-22 Thread Json Tu
Hi all,
we have a cluster with 10 brokers, and our kafka version is 0.9.0.1,we 
repeatedly get our metric data such as offlinePartition metric from each broker 
with 2 minutes gap to achieve the goal of cluster’s monitor.
but accidental timeout occurs when we get data from some of brokers. which will 
leads to false alarm information.
such as we may get exception as below.
error: Failed to retrieve RMIServer stub: 
javax.naming.ServiceUnavailableException [Root exception is 
java.rmi.ConnectException: Connection refused to host: 10.11.12.13; nested 
exception is: 
java.net.ConnectException: Connection timed out]

we find our TcpExt.TCPBacklogDrop index is fluctuate repeatedly, may be 
this is some root cause. if it’s the problem. how can I optimize it.

Any suggestion is appreciated. Thanks before.




Question about KIP-73 pr merge

2016-12-20 Thread Json Tu
Hi all, 
We are now using kafka 0.9.0 in our product enviroment, and we add one 
broker to the cluster,and execute reassign partitions between all brokers,we 
find our network card and disk io is very high.
and I know KIP-73 has resolved this problem, but I wonder can I merge it to my 
kafka 0.9.0 directly, because I have two doubts as below.
1.https://github.com/apache/kafka/pull/1776 
 is created compared with trunk, 
kafka 0.9.0 is diff to trunk.
2.because of the diff of kafka 0.9.0 and trunk, there may be some 
bugfixs that associated with this pr, how can I avoid this risk.
 
Any suggestion is appreciated,thanks in advance.

Re: log.flush.interval.messages setting of Kafka 0.9.0.0

2016-12-18 Thread Json Tu
Would be grateful to hear opinions from experts out there. Thanks in advance


> 在 2016年12月16日,下午6:17,Json Tu  写道:
> 
> Hi all,
>   we have a cluster of 0.9.0.0 with 3 nodes, we have a topic with 3 
> replicas, and send it with ack -1, our sending latency is avg 7ms. I prepare 
> to optimize performance of cluster through adjusting some params.
> we find our brokers has set config item as below,
>   log.flush.interval.messages=1
> and other relevant parameter is default, and I find the default value of 
> log.flush.interval.messages is LONG.MAX_VALUE, because of setting this config 
> will flush intiative that may affect performace . I wonder can I cancel this 
> config  item’s setting, and use default value.
> 
>   I think use default value may have two drawback as below.
>   1.recovery checkpoint can not be updated,so when load 
> segments,it will scan from begin to end.
>   2.it may lose data when leader partition’s broker’s vm is 
> restart,but I think 3 replicas can remedy this drawback if the network 
> between them is good.
> 
>   any suggestions? thank you




log.flush.interval.messages setting of Kafka 0.9.0.0

2016-12-16 Thread Json Tu
Hi all,
we have a cluster of 0.9.0.0 with 3 nodes, we have a topic with 3 
replicas, and send it with ack -1, our sending latency is avg 7ms. I prepare to 
optimize performance of cluster through adjusting some params.
we find our brokers has set config item as below,
log.flush.interval.messages=1
and other relevant parameter is default, and I find the default value of 
log.flush.interval.messages is LONG.MAX_VALUE, because of setting this config 
will flush intiative that may affect performace . I wonder can I cancel this 
config  item’s setting, and use default value.

I think use default value may have two drawback as below.
1.recovery checkpoint can not be updated,so when load 
segments,it will scan from begin to end.
2.it may lose data when leader partition’s broker’s vm is 
restart,but I think 3 replicas can remedy this drawback if the network between 
them is good.

any suggestions? thank you


Re: A strange controller log in Kafka 0.9.0.1

2016-12-01 Thread Json Tu
Hi,
Can someone else help to review the pr in jira: 
https://issues.apache.org/jira/browse/KAFKA-4447 
<https://issues.apache.org/jira/browse/KAFKA-4447>.

> 在 2016年11月23日,下午11:28,Json Tu  写道:
> 
> Hi,
>   We have a cluster of kafka 0.9.0.1 with 3 nodes, and we found a strange 
> controller log as below.
> 
> [2016-11-07 03:14:48,575] INFO [SessionExpirationListener on 100], ZK 
> expired; shut down all controller components and try to re-elect 
> (kafka.controller.KafkaController$SessionExpirationListener)
> [2016-11-07 03:14:48,578] DEBUG [Controller 100]: Controller resigning, 
> broker id 100 (kafka.controller.KafkaController)
> [2016-11-07 03:14:48,579] DEBUG [Controller 100]: De-registering 
> IsrChangeNotificationListener (kafka.controller.KafkaController)
> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutting down 
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Stopped  
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutdown completed 
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-11-07 03:14:48,580] INFO [Partition state machine on Controller 100]: 
> Stopped partition state machine (kafka.controller.PartitionStateMachine)
> [2016-11-07 03:14:48,580] INFO [Replica state machine on controller 100]: 
> Stopped replica state machine (kafka.controller.ReplicaStateMachine)
> [2016-11-07 03:14:48,583] INFO [Controller-100-to-broker-101-send-thread], 
> Shutting down (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread], 
> Stopped  (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread], 
> Shutdown completed (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,586] INFO [Controller-100-to-broker-100-send-thread], 
> Shutting down (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread], 
> Stopped  (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread], 
> Shutdown completed (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,587] INFO [Controller 100]: Broker 100 resigned as the 
> controller (kafka.controller.KafkaController)
> [2016-11-07 03:14:48,652] DEBUG [IsrChangeNotificationListener] Fired!!! 
> (kafka.controller.IsrChangeNotificationListener)
> [2016-11-07 03:14:48,668] INFO [BrokerChangeListener on Controller 100]: 
> Broker change listener fired for path /brokers/ids with children 101,100 
> (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
> [2016-11-07 03:14:48,683] DEBUG [DeleteTopicsListener on 100]: Delete topics 
> listener fired for topics  to be deleted 
> (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
> [2016-11-07 03:14:48,687] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>  for path /brokers/topics/movie.gateway.merselllog.syncCinema 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> [2016-11-07 03:14:48,694] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>  for path /brokers/topics/push_3rdparty_high 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> [2016-11-07 03:14:48,707] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[101,102],"5":[102,100],"1":[101,100],"0":[100,102],"2":[102,101],"3":[100,101]}}
>  for path /brokers/topics/icb_msg_push_high_02 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> [2016-11-07 03:14:48,715] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[102,100],"5":[100,101],"1":[102,101],"0":[101,100],"2":[100,102],"3":[101,102]}}
>  for path /brokers/topics/movie.gateway.merselllog.unlockSeat 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> 
> 
>   From the log we can see that old controller 100 resigned as the 
> controller successfully,but what confused me is that it can also receive 
> Fired!!! from IsrChangeNotificationListener which have beed de-register 
> before,
> and we can see broker 100 not elect as new controller next time. but we can 
> see IsrChangeNotificationListener、DeleteTopicsListener、AddPartitionsListener 
> all fired after ressign,does it seems something run with zookeeper.
>   Any suggestion is appreciated, thanks in advance.
> 
> 



Re: A strange controller log in Kafka 0.9.0.1

2016-11-28 Thread Json Tu
thanks to Jason Gustafson, hope more contributor can take part in this 
discussion.
https://issues.apache.org/jira/browse/KAFKA-4447 
<https://issues.apache.org/jira/browse/KAFKA-4447>

> 在 2016年11月27日,下午9:20,Json Tu  写道:
> 
> AnyBody?This is very disconcerting! If convenient, Can somebody help to 
> confirm this strange question.
> 
>> 在 2016年11月26日,上午1:35,Json Tu  写道:
>> 
>> thanks guozhang,
>>  if it's convenient,can we disscuss it in the jira 
>> https://issues.apache.org/jira/browse/KAFKA-4447 
>> <https://issues.apache.org/jira/browse/KAFKA-4447>,I guess some body may 
>> also encounter this problem.
>> 
>>> 在 2016年11月25日,下午12:31,Guozhang Wang  写道:
>>> 
>>> Does broker 100 keeps acting as the controller afterwards? What you observe
>>> is possible and should be transient since "unsubscribeChildChanges" on
>>> ZkClient and listener fired procedure are executed on different threads and
>>> they are not strictly synchronized. But if you continuously see broker
>>> 100's listener fires and it acts like a controller then there may be an
>>> issue with 0.9.0.1 version.
>>> 
>>> Guozhang
>>> 
>>> On Wed, Nov 23, 2016 at 7:28 AM, Json Tu  wrote:
>>> 
>>>> Hi,
>>>>  We have a cluster of kafka 0.9.0.1 with 3 nodes, and we found a
>>>> strange controller log as below.
>>>> 
>>>> [2016-11-07 03:14:48,575] INFO [SessionExpirationListener on 100], ZK
>>>> expired; shut down all controller components and try to re-elect
>>>> (kafka.controller.KafkaController$SessionExpirationListener)
>>>> [2016-11-07 03:14:48,578] DEBUG [Controller 100]: Controller resigning,
>>>> broker id 100 (kafka.controller.KafkaController)
>>>> [2016-11-07 03:14:48,579] DEBUG [Controller 100]: De-registering
>>>> IsrChangeNotificationListener (kafka.controller.KafkaController)
>>>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutting down
>>>> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>>>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Stopped
>>>> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>>>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutdown
>>>> completed (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>>>> [2016-11-07 03:14:48,580] INFO [Partition state machine on Controller
>>>> 100]: Stopped partition state machine (kafka.controller.
>>>> PartitionStateMachine)
>>>> [2016-11-07 03:14:48,580] INFO [Replica state machine on controller 100]:
>>>> Stopped replica state machine (kafka.controller.ReplicaStateMachine)
>>>> [2016-11-07 03:14:48,583] INFO [Controller-100-to-broker-101-send-thread],
>>>> Shutting down (kafka.controller.RequestSendThread)
>>>> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread],
>>>> Stopped  (kafka.controller.RequestSendThread)
>>>> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread],
>>>> Shutdown completed (kafka.controller.RequestSendThread)
>>>> [2016-11-07 03:14:48,586] INFO [Controller-100-to-broker-100-send-thread],
>>>> Shutting down (kafka.controller.RequestSendThread)
>>>> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread],
>>>> Stopped  (kafka.controller.RequestSendThread)
>>>> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread],
>>>> Shutdown completed (kafka.controller.RequestSendThread)
>>>> [2016-11-07 03:14:48,587] INFO [Controller 100]: Broker 100 resigned as
>>>> the controller (kafka.controller.KafkaController)
>>>> [2016-11-07 03:14:48,652] DEBUG [IsrChangeNotificationListener] Fired!!!
>>>> (kafka.controller.IsrChangeNotificationListener)
>>>> [2016-11-07 03:14:48,668] INFO [BrokerChangeListener on Controller 100]:
>>>> Broker change listener fired for path /brokers/ids with children 101,100
>>>> (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
>>>> [2016-11-07 03:14:48,683] DEBUG [DeleteTopicsListener on 100]: Delete
>>>> topics listener fired for topics  to be deleted (kafka.controller.
>>>> PartitionStateMachine$DeleteTopicsListener)
>>>> [2016-11-07 03:14:48,687] INFO [AddPartitionsListener on 100]: Add
>>>> Partition triggered {"version":1,"partitions":{"4"
>>>> :[102,101],"5":[100

Re: A strange controller log in Kafka 0.9.0.1

2016-11-27 Thread Json Tu
AnyBody?This is very disconcerting! If convenient, Can somebody help to confirm 
this strange question.

> 在 2016年11月26日,上午1:35,Json Tu  写道:
> 
> thanks guozhang,
>   if it's convenient,can we disscuss it in the jira 
> https://issues.apache.org/jira/browse/KAFKA-4447 
> <https://issues.apache.org/jira/browse/KAFKA-4447>,I guess some body may also 
> encounter this problem.
> 
>> 在 2016年11月25日,下午12:31,Guozhang Wang  写道:
>> 
>> Does broker 100 keeps acting as the controller afterwards? What you observe
>> is possible and should be transient since "unsubscribeChildChanges" on
>> ZkClient and listener fired procedure are executed on different threads and
>> they are not strictly synchronized. But if you continuously see broker
>> 100's listener fires and it acts like a controller then there may be an
>> issue with 0.9.0.1 version.
>> 
>> Guozhang
>> 
>> On Wed, Nov 23, 2016 at 7:28 AM, Json Tu  wrote:
>> 
>>> Hi,
>>>   We have a cluster of kafka 0.9.0.1 with 3 nodes, and we found a
>>> strange controller log as below.
>>> 
>>> [2016-11-07 03:14:48,575] INFO [SessionExpirationListener on 100], ZK
>>> expired; shut down all controller components and try to re-elect
>>> (kafka.controller.KafkaController$SessionExpirationListener)
>>> [2016-11-07 03:14:48,578] DEBUG [Controller 100]: Controller resigning,
>>> broker id 100 (kafka.controller.KafkaController)
>>> [2016-11-07 03:14:48,579] DEBUG [Controller 100]: De-registering
>>> IsrChangeNotificationListener (kafka.controller.KafkaController)
>>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutting down
>>> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Stopped
>>> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutdown
>>> completed (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>>> [2016-11-07 03:14:48,580] INFO [Partition state machine on Controller
>>> 100]: Stopped partition state machine (kafka.controller.
>>> PartitionStateMachine)
>>> [2016-11-07 03:14:48,580] INFO [Replica state machine on controller 100]:
>>> Stopped replica state machine (kafka.controller.ReplicaStateMachine)
>>> [2016-11-07 03:14:48,583] INFO [Controller-100-to-broker-101-send-thread],
>>> Shutting down (kafka.controller.RequestSendThread)
>>> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread],
>>> Stopped  (kafka.controller.RequestSendThread)
>>> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread],
>>> Shutdown completed (kafka.controller.RequestSendThread)
>>> [2016-11-07 03:14:48,586] INFO [Controller-100-to-broker-100-send-thread],
>>> Shutting down (kafka.controller.RequestSendThread)
>>> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread],
>>> Stopped  (kafka.controller.RequestSendThread)
>>> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread],
>>> Shutdown completed (kafka.controller.RequestSendThread)
>>> [2016-11-07 03:14:48,587] INFO [Controller 100]: Broker 100 resigned as
>>> the controller (kafka.controller.KafkaController)
>>> [2016-11-07 03:14:48,652] DEBUG [IsrChangeNotificationListener] Fired!!!
>>> (kafka.controller.IsrChangeNotificationListener)
>>> [2016-11-07 03:14:48,668] INFO [BrokerChangeListener on Controller 100]:
>>> Broker change listener fired for path /brokers/ids with children 101,100
>>> (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
>>> [2016-11-07 03:14:48,683] DEBUG [DeleteTopicsListener on 100]: Delete
>>> topics listener fired for topics  to be deleted (kafka.controller.
>>> PartitionStateMachine$DeleteTopicsListener)
>>> [2016-11-07 03:14:48,687] INFO [AddPartitionsListener on 100]: Add
>>> Partition triggered {"version":1,"partitions":{"4"
>>> :[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>>> for path /brokers/topics/movie.gateway.merselllog.syncCinema
>>> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
>>> [2016-11-07 03:14:48,694] INFO [AddPartitionsListener on 100]: Add
>>> Partition triggered {"version":1,"partitions":{"4"
>>> :[102,101],"5":[100,102],"1":[102,100],"

Re: A strange controller log in Kafka 0.9.0.1

2016-11-25 Thread Json Tu
thanks guozhang,
if it's convenient,can we disscuss it in the jira 
https://issues.apache.org/jira/browse/KAFKA-4447 
<https://issues.apache.org/jira/browse/KAFKA-4447>,I guess some body may also 
encounter this problem.

> 在 2016年11月25日,下午12:31,Guozhang Wang  写道:
> 
> Does broker 100 keeps acting as the controller afterwards? What you observe
> is possible and should be transient since "unsubscribeChildChanges" on
> ZkClient and listener fired procedure are executed on different threads and
> they are not strictly synchronized. But if you continuously see broker
> 100's listener fires and it acts like a controller then there may be an
> issue with 0.9.0.1 version.
> 
> Guozhang
> 
> On Wed, Nov 23, 2016 at 7:28 AM, Json Tu  wrote:
> 
>> Hi,
>>We have a cluster of kafka 0.9.0.1 with 3 nodes, and we found a
>> strange controller log as below.
>> 
>> [2016-11-07 03:14:48,575] INFO [SessionExpirationListener on 100], ZK
>> expired; shut down all controller components and try to re-elect
>> (kafka.controller.KafkaController$SessionExpirationListener)
>> [2016-11-07 03:14:48,578] DEBUG [Controller 100]: Controller resigning,
>> broker id 100 (kafka.controller.KafkaController)
>> [2016-11-07 03:14:48,579] DEBUG [Controller 100]: De-registering
>> IsrChangeNotificationListener (kafka.controller.KafkaController)
>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutting down
>> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Stopped
>> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutdown
>> completed (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>> [2016-11-07 03:14:48,580] INFO [Partition state machine on Controller
>> 100]: Stopped partition state machine (kafka.controller.
>> PartitionStateMachine)
>> [2016-11-07 03:14:48,580] INFO [Replica state machine on controller 100]:
>> Stopped replica state machine (kafka.controller.ReplicaStateMachine)
>> [2016-11-07 03:14:48,583] INFO [Controller-100-to-broker-101-send-thread],
>> Shutting down (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread],
>> Stopped  (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread],
>> Shutdown completed (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,586] INFO [Controller-100-to-broker-100-send-thread],
>> Shutting down (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread],
>> Stopped  (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread],
>> Shutdown completed (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,587] INFO [Controller 100]: Broker 100 resigned as
>> the controller (kafka.controller.KafkaController)
>> [2016-11-07 03:14:48,652] DEBUG [IsrChangeNotificationListener] Fired!!!
>> (kafka.controller.IsrChangeNotificationListener)
>> [2016-11-07 03:14:48,668] INFO [BrokerChangeListener on Controller 100]:
>> Broker change listener fired for path /brokers/ids with children 101,100
>> (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
>> [2016-11-07 03:14:48,683] DEBUG [DeleteTopicsListener on 100]: Delete
>> topics listener fired for topics  to be deleted (kafka.controller.
>> PartitionStateMachine$DeleteTopicsListener)
>> [2016-11-07 03:14:48,687] INFO [AddPartitionsListener on 100]: Add
>> Partition triggered {"version":1,"partitions":{"4"
>> :[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>> for path /brokers/topics/movie.gateway.merselllog.syncCinema
>> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
>> [2016-11-07 03:14:48,694] INFO [AddPartitionsListener on 100]: Add
>> Partition triggered {"version":1,"partitions":{"4"
>> :[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>> for path /brokers/topics/push_3rdparty_high (kafka.controller.
>> PartitionStateMachine$AddPartitionsListener)
>> [2016-11-07 03:14:48,707] INFO [AddPartitionsListener on 100]: Add
>> Partition triggered {"version":1,"partitions":{"4"
>> :[101,102],"5":[102,100],"1":[101,100],"0":[100,102],"2&

A strange controller log in Kafka 0.9.0.1

2016-11-23 Thread Json Tu
Hi,
We have a cluster of kafka 0.9.0.1 with 3 nodes, and we found a strange 
controller log as below.

[2016-11-07 03:14:48,575] INFO [SessionExpirationListener on 100], ZK expired; 
shut down all controller components and try to re-elect 
(kafka.controller.KafkaController$SessionExpirationListener)
[2016-11-07 03:14:48,578] DEBUG [Controller 100]: Controller resigning, broker 
id 100 (kafka.controller.KafkaController)
[2016-11-07 03:14:48,579] DEBUG [Controller 100]: De-registering 
IsrChangeNotificationListener (kafka.controller.KafkaController)
[2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutting down 
(kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Stopped  
(kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutdown completed 
(kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2016-11-07 03:14:48,580] INFO [Partition state machine on Controller 100]: 
Stopped partition state machine (kafka.controller.PartitionStateMachine)
[2016-11-07 03:14:48,580] INFO [Replica state machine on controller 100]: 
Stopped replica state machine (kafka.controller.ReplicaStateMachine)
[2016-11-07 03:14:48,583] INFO [Controller-100-to-broker-101-send-thread], 
Shutting down (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread], 
Stopped  (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread], 
Shutdown completed (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,586] INFO [Controller-100-to-broker-100-send-thread], 
Shutting down (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread], 
Stopped  (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread], 
Shutdown completed (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,587] INFO [Controller 100]: Broker 100 resigned as the 
controller (kafka.controller.KafkaController)
[2016-11-07 03:14:48,652] DEBUG [IsrChangeNotificationListener] Fired!!! 
(kafka.controller.IsrChangeNotificationListener)
[2016-11-07 03:14:48,668] INFO [BrokerChangeListener on Controller 100]: Broker 
change listener fired for path /brokers/ids with children 101,100 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2016-11-07 03:14:48,683] DEBUG [DeleteTopicsListener on 100]: Delete topics 
listener fired for topics  to be deleted 
(kafka.controller.PartitionStateMachine$DeleteTopicsListener)
[2016-11-07 03:14:48,687] INFO [AddPartitionsListener on 100]: Add Partition 
triggered 
{"version":1,"partitions":{"4":[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
 for path /brokers/topics/movie.gateway.merselllog.syncCinema 
(kafka.controller.PartitionStateMachine$AddPartitionsListener)
[2016-11-07 03:14:48,694] INFO [AddPartitionsListener on 100]: Add Partition 
triggered 
{"version":1,"partitions":{"4":[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
 for path /brokers/topics/push_3rdparty_high 
(kafka.controller.PartitionStateMachine$AddPartitionsListener)
[2016-11-07 03:14:48,707] INFO [AddPartitionsListener on 100]: Add Partition 
triggered 
{"version":1,"partitions":{"4":[101,102],"5":[102,100],"1":[101,100],"0":[100,102],"2":[102,101],"3":[100,101]}}
 for path /brokers/topics/icb_msg_push_high_02 
(kafka.controller.PartitionStateMachine$AddPartitionsListener)
[2016-11-07 03:14:48,715] INFO [AddPartitionsListener on 100]: Add Partition 
triggered 
{"version":1,"partitions":{"4":[102,100],"5":[100,101],"1":[102,101],"0":[101,100],"2":[100,102],"3":[101,102]}}
 for path /brokers/topics/movie.gateway.merselllog.unlockSeat 
(kafka.controller.PartitionStateMachine$AddPartitionsListener)


From the log we can see that old controller 100 resigned as the 
controller successfully,but what confused me is that it can also receive 
Fired!!! from IsrChangeNotificationListener which have beed de-register before,
and we can see broker 100 not elect as new controller next time. but we can see 
IsrChangeNotificationListener、DeleteTopicsListener、AddPartitionsListener all 
fired after ressign,does it seems something run with zookeeper.
Any suggestion is appreciated, thanks in advance.





replica fetch error and shuabing

2016-11-07 Thread Json Tu
Hi, when I move __consumer_offsets from old broker to new broker, we encounter 
error as follow and it always shuabing.
server.log.2016-11-07-19:[2016-11-07 19:17:15,392] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)
server.log.2016-11-07-19:[2016-11-07 19:17:15,476] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)
server.log.2016-11-07-19:[2016-11-07 19:17:15,573] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)
server.log.2016-11-07-19:[2016-11-07 19:17:15,640] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)
server.log.2016-11-07-19:[2016-11-07 19:17:15,697] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)
server.log.2016-11-07-19:[2016-11-07 19:17:15,770] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)

anyone can help solve it,thanks.


KAFKA-4360 issue

2016-10-31 Thread Json Tu

> 在 2016年11月1日,上午10:54,huxi (JIRA)  写道:
> 
> 
>[ 
> https://issues.apache.org/jira/browse/KAFKA-4360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15624155#comment-15624155
>  ] 
> 
> huxi commented on KAFKA-4360:
> -
> 
> Excellent analysis! What I am intrigued is whether this is a deadlock issue 
> or a liveness issue. Here is my analysis:
> 1. Say at time T1, the zookeeper session expires, so 'handleNewSession' 
> methods for SessionExpirationListener is executed, therefore, obtaining the 
> controller lock(controllerContext.controllerLock)
> 2. Then it invokes 'onControllerResignation' method to have the current 
> controller quit, which will shutdown leader rebalance scheduler by calling 
> KafkaScheduler.shutdown
> 3. In 'shutdown' method, it shuts down the ScheduledThreadPoolExecutor and 
> blocks until all tasks have completed execution after a shutdown request
> 4. If there exists any tasks submitted before calling shutdown, the 
> check-imbalance thread should get started with checking isActive which 
> acquires the controller lock at the very beginning and then soon be blocked 
> due to the lock has already been held by the main thread.
> 5. In that case, the main thread will block in onControllerResignation method 
> until one day has elapsed by default or you just interrupt the check thread.
> 
> Does it make sense?
> 
> 
>> Controller may deadLock when autoLeaderRebalance encounter zk expired
>> -
>> 
>>Key: KAFKA-4360
>>URL: https://issues.apache.org/jira/browse/KAFKA-4360
>>    Project: Kafka
>> Issue Type: Bug
>> Components: controller
>>   Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>>   Reporter: Json Tu
>> Labels: bugfix
>>Attachments: yf-mafka2-common02_jstack.txt
>> 
>>  Original Estimate: 168h
>> Remaining Estimate: 168h
>> 
>> when controller has checkAndTriggerPartitionRebalance task in 
>> autoRebalanceScheduler,and then zk expired at that time. It will
>> run into deadlock.
>> we can restore the scene as below,when zk session expired,zk thread will 
>> call handleNewSession which defined in SessionExpirationListener, and it 
>> will get controllerContext.controllerLock,and then it will 
>> autoRebalanceScheduler.shutdown(),which need complete all the task in the 
>> autoRebalanceScheduler,but that threadPoll also need get 
>> controllerContext.controllerLock,but it has already owned by zk callback 
>> thread,which will then run into deadlock.
>> because of that,it will cause two problems at least, first is the broker’s 
>> id is cannot register to the zookeeper,and it will be considered as dead by 
>> new controller,second this procedure can not be stop by 
>> kafka-server-stop.sh, because shutdown function
>> can not get controllerContext.controllerLock also, we cannot shutdown kafka 
>> except using kill -9.
>> In my attachment, I upload a jstack file, which was created when my kafka 
>> procedure cannot shutdown by kafka-server-stop.sh.
>> I have met this scenes for several times,I think this may be a bug that not 
>> solved in kafka.
> 
> 
> 
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)




KAFKA-4360 issue

2016-10-31 Thread Json Tu
Hi,
Can someone discuss it in KAFKA-4360, thanks.

> 在 2016年11月1日,上午10:54,huxi (JIRA)  写道:
> 
> 
>   [ 
> https://issues.apache.org/jira/browse/KAFKA-4360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15624155#comment-15624155
>  ] 
> 
> huxi commented on KAFKA-4360:
> -
> 
> Excellent analysis! What I am intrigued is whether this is a deadlock issue 
> or a liveness issue. Here is my analysis:
> 1. Say at time T1, the zookeeper session expires, so 'handleNewSession' 
> methods for SessionExpirationListener is executed, therefore, obtaining the 
> controller lock(controllerContext.controllerLock)
> 2. Then it invokes 'onControllerResignation' method to have the current 
> controller quit, which will shutdown leader rebalance scheduler by calling 
> KafkaScheduler.shutdown
> 3. In 'shutdown' method, it shuts down the ScheduledThreadPoolExecutor and 
> blocks until all tasks have completed execution after a shutdown request
> 4. If there exists any tasks submitted before calling shutdown, the 
> check-imbalance thread should get started with checking isActive which 
> acquires the controller lock at the very beginning and then soon be blocked 
> due to the lock has already been held by the main thread.
> 5. In that case, the main thread will block in onControllerResignation method 
> until one day has elapsed by default or you just interrupt the check thread.
> 
> Does it make sense?
> 
> 
>> Controller may deadLock when autoLeaderRebalance encounter zk expired
>> -
>> 
>>   Key: KAFKA-4360
>>   URL: https://issues.apache.org/jira/browse/KAFKA-4360
>>   Project: Kafka
>>Issue Type: Bug
>>Components: controller
>>  Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>>  Reporter: Json Tu
>>Labels: bugfix
>>   Attachments: yf-mafka2-common02_jstack.txt
>> 
>> Original Estimate: 168h
>> Remaining Estimate: 168h
>> 
>> when controller has checkAndTriggerPartitionRebalance task in 
>> autoRebalanceScheduler,and then zk expired at that time. It will
>> run into deadlock.
>> we can restore the scene as below,when zk session expired,zk thread will 
>> call handleNewSession which defined in SessionExpirationListener, and it 
>> will get controllerContext.controllerLock,and then it will 
>> autoRebalanceScheduler.shutdown(),which need complete all the task in the 
>> autoRebalanceScheduler,but that threadPoll also need get 
>> controllerContext.controllerLock,but it has already owned by zk callback 
>> thread,which will then run into deadlock.
>> because of that,it will cause two problems at least, first is the broker’s 
>> id is cannot register to the zookeeper,and it will be considered as dead by 
>> new controller,second this procedure can not be stop by 
>> kafka-server-stop.sh, because shutdown function
>> can not get controllerContext.controllerLock also, we cannot shutdown kafka 
>> except using kill -9.
>> In my attachment, I upload a jstack file, which was created when my kafka 
>> procedure cannot shutdown by kafka-server-stop.sh.
>> I have met this scenes for several times,I think this may be a bug that not 
>> solved in kafka.
> 
> 
> 
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)




Kafka cannot shutdown

2016-10-28 Thread Json Tu
Hi all,
We have a kafka cluster with 11 nodes, and we found there are some 
partition’s replica num is not equal to isr’s num,because our data traffic is 
small,we think it should isr’s num should equal to replica’s num at last,
but it can not recovery to normal,so we try to shutdown a broker that have 
follower partition and not catch up with leader.
before we shutdown the broker,we found the broker’s id is not in 
zookeeper’s ids children list,so I think it is disconnected to zookeeper again 
for some network traffic,but the procedure is alive. we have found this 
phenomenon for several
times, we think it is the zookeeper callback missed, so the zkclient cannot 
register it again.
but it is not my point,my point is,we stop the kafka,but it cannot 
normal exit,because it will stop at kafkacontroller.shutdown() for very very 
long time,and we cannot exit the broker util we use kill -9.


for solve this problem,we jstack the procedure and we found it stop at 
autoRebalanceScheduler.shutdown(),my stack’s result is in my email’s attachment.

can someone help it, thank you very much.
2016-10-28 22:22:44
Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode):

"Attach Listener" daemon prio=10 tid=0x7f29b0002000 nid=0x27d6 waiting on 
condition [0x]
   java.lang.Thread.State: RUNNABLE

"Thread-2" prio=10 tid=0x7f299049b800 nid=0x241f waiting on condition 
[0x7f2888a0c000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xbd411f20> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
at kafka.controller.KafkaController.shutdown(KafkaController.scala:692)
at 
kafka.server.KafkaServer$$anonfun$shutdown$10.apply$mcV$sp(KafkaServer.scala:543)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79)
at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:51)
at kafka.utils.Logging$class.swallow(Logging.scala:94)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:51)
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:543)
at 
kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:49)
at kafka.Kafka$$anon$1.run(Kafka.scala:63)

"SIGTERM handler" daemon prio=10 tid=0x7f29b0001000 nid=0x241c in 
Object.wait() [0x7f2889059000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
- locked <0xbc7fa470> (a kafka.Kafka$$anon$1)
at java.lang.Thread.join(Thread.java:1355)
at 
java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
at 
java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
at java.lang.Shutdown.runHooks(Shutdown.java:123)
at java.lang.Shutdown.sequence(Shutdown.java:167)
at java.lang.Shutdown.exit(Shutdown.java:212)
- locked <0xbc70ada0> (a java.lang.Class for java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)

"main-EventThread" daemon prio=10 tid=0x7f2984027800 nid=0x46d3 waiting on 
condition [0x7f2888c55000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xdc11ae78> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:494)

"main-SendThread(10.4.232.86:2181)" daemon prio=10 tid=0x7f2984026800 
nid=0x46d2 runnable [0x7f2888d56000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun

Re: handleFetchRequest throw exception

2016-10-26 Thread Json Tu
Thanks to guozhang.
According to your suggestions,I found my new patch to kafka 0.9.0.0 may casue 
the problem,
In delayedfetch.scala,  I include import 
org.apache.kafka.common.errors.NotLeaderForPartitionException but not import 
kafka.common.NotLeaderForPartitionException for intelij auto import,
so the getLeaderReplicaIfLocal’s internal throw( 
kafka.common.NotLeaderForPartitionException) can not be catch by tryComplete(), 
so it throw up to until handle, I think it may be the cause of repeated error 
log and other strange thing.

> 在 2016年10月27日,上午7:31,Guozhang Wang  写道:
> 
> Json,
> 
> As you mentioned yourself the "NotLeaderForPartitionException" thrown
> from getLeaderReplicaIfLocal
> should be caught in the end, and hence I'm not sure why the reported stack
> trace "ERROR: ..." throwing the NotLeaderForPartitionException should be
> seen from "tryComplete". Also I have checked the source code in both
> 0.9.0.0 and 0.8.2.2, their line numbers does not match with the reported
> stack trace line numbers (e.g. DelayedFetch.scala:72), so I cannot really
> tell why you could ever see the error message instead of the
> DEBUG-level "Broker
> is no longer the leader of %s, satisfy %s immediately..".
> 
> Following the 0.9.0.0 source code, since it NotLeaderForPartitionException
> is caught and force the delayed fetch request to be sent with some
> potential error code, it will not cause the replica's fetch request to be
> not return successfully to the fetch broker, and hence should not leader
> producer / consumer to fail for a long time. Similarly, since we force
> completing those delayed fetch requests as well, it should not cause a spam
> of repeated error log entries since it should at most print one entry (and
> should be DEBUG not ERROR) for each delayed request whose partition leaders
> have migrated out.
> 
> 
> 
> Guozhang
> 
> 
> 
> On Wed, Oct 26, 2016 at 7:46 AM, Json Tu  wrote:
> 
>> it make the cluster can not provide normal service,which leades some
>> producer or fetch fail for a long time before I restart current broker.
>> this error may be come from some formerly fetch operation which contain
>> this partition,which leads many fetch response error.
>> 
>> The delayFetch's tryComplete() function implements as below,
>> override def tryComplete() : Boolean = {
>> var accumulatedSize = 0
>> fetchMetadata.fetchPartitionStatus.foreach {
>>   case (topicAndPartition, fetchStatus) =>
>> val fetchOffset = fetchStatus.startOffsetMetadata
>> try {
>>   if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
>> val replica = 
>> replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic,
>> topicAndPartition.partition)
>> /*ignore some codes*/
>>   }
>> } catch {
>>   /*ignore some code*/
>>   case nle: NotLeaderForPartitionException =>  // Case A
>> debug("Broker is no longer the leader of %s, satisfy %s
>> immediately".format(topicAndPartition, fetchMetadata))
>> return forceComplete()
>> }
>> }
>> /* ignore some codes */
>> }
>> 
>> when meet NotLeaderForPartitionException, it will invoke forceComplete()
>> function, then it will invoke onComplete() function, which implements as
>> below,
>> override def onComplete() {
>> val logReadResults = replicaManager.readFromLocalLog(
>> fetchMetadata.fetchOnlyLeader,
>>   fetchMetadata.fetchOnlyCommitted,
>>   fetchMetadata.fetchPartitionStatus.mapValues(status =>
>> status.fetchInfo))
>> 
>> val fetchPartitionData = logReadResults.mapValues(result =>
>>   FetchResponsePartitionData(result.errorCode, result.hw,
>> result.info.messageSet))
>> 
>> responseCallback(fetchPartitionData)
>> }
>> 
>> so, I think it exit the tryComplete function in advance because of this
>> partition, which makes the partition latter in this request may not be
>> completely be satisfied and return to the fetch broker,
>> which leads some producer and consumer fail for a longtime,I don’t know is
>> it correct
>> 
>>> 在 2016年10月25日,下午8:32,Json Tu  写道:
>>> 
>>> Hi all,
>>>  I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I
>> restart a broker,we find there are many logs as below,
>>> 
>>> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling
>> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId:
>> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms;
>> MinBytes: 1 b

Re: handleFetchRequest throw exception

2016-10-26 Thread Json Tu
 it make the cluster can not provide normal service,which leades some producer 
or fetch fail for a long time before I restart current broker.
 this error may be come from some formerly fetch operation which contain this 
partition,which leads many fetch response error.

The delayFetch's tryComplete() function implements as below,
 override def tryComplete() : Boolean = {
 var accumulatedSize = 0
 fetchMetadata.fetchPartitionStatus.foreach {
   case (topicAndPartition, fetchStatus) =>
 val fetchOffset = fetchStatus.startOffsetMetadata
 try {
   if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
 val replica = 
replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, 
topicAndPartition.partition)
 /*ignore some codes*/
   }
 } catch {
   /*ignore some code*/
   case nle: NotLeaderForPartitionException =>  // Case A
 debug("Broker is no longer the leader of %s, satisfy %s 
immediately".format(topicAndPartition, fetchMetadata))
 return forceComplete()
 }
 }
 /* ignore some codes */
}

when meet NotLeaderForPartitionException, it will invoke forceComplete() 
function, then it will invoke onComplete() function, which implements as below,
override def onComplete() {
 val logReadResults = 
replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader,
   fetchMetadata.fetchOnlyCommitted,
   fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo))

 val fetchPartitionData = logReadResults.mapValues(result =>
   FetchResponsePartitionData(result.errorCode, result.hw, 
result.info.messageSet))

 responseCallback(fetchPartitionData)
}

so, I think it exit the tryComplete function in advance because of this 
partition, which makes the partition latter in this request may not be 
completely be satisfied and return to the fetch broker,
which leads some producer and consumer fail for a longtime,I don’t know is it 
correct

> 在 2016年10月25日,下午8:32,Json Tu  写道:
> 
> Hi all,
>   I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I restart 
> a broker,we find there are many logs as below,
> 
> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling 
> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId: 
> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms; 
> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] -> 
> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] -> 
> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
>  -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] -> 
> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] -> 
> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] -> 
> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28] 
> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0] -> 
> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] -> 
> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14] -> 
> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31] 
> -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] -> 
> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] -> 
> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] -> 
> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] -> 
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] -> 
> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] -> 
> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] -> 
> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1] -> 
> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0] -> 
> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13]
>  -> 
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55] 
> -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] -> 
> PartitionFetchInfo(442564,1048576),[waimai_ordersa_topic_user_order_in_poi_count_diff,5]
>  -> PartitionFetchInfo(23791010,1048576),[retail.c.order.create,4] -> 
> PartitionFetchInfo(72902,1048576),[waimai_c_ucenter_asyncrelationbind_staging,2]
>  -> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] -> 
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] -> 
> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] -> 
> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] -> 
> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15] -> 
> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_ordersta

handleFetchRequest throw exception

2016-10-25 Thread Json Tu
Hi all,
I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I restart 
a broker,we find there are many logs as below,

[2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling request 
Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId: 
ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms; MinBytes: 
1 bytes; RequestInfo: [retail.c.order.logistics,0] -> 
PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] -> 
PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
 -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] -> 
PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] -> 
PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] -> 
PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28] -> 
PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0] -> 
PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] -> 
PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14] -> 
PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31] -> 
PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] -> 
PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] -> 
PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] -> 
PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] -> 
PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] -> 
PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] -> 
PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] -> 
PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1] -> 
PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0] -> 
PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13]
 -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55] 
-> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] -> 
PartitionFetchInfo(442564,1048576),[waimai_ordersa_topic_user_order_in_poi_count_diff,5]
 -> PartitionFetchInfo(23791010,1048576),[retail.c.order.create,4] -> 
PartitionFetchInfo(72902,1048576),[waimai_c_ucenter_asyncrelationbind_staging,2]
 -> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] -> 
PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] -> 
PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] -> 
PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] -> 
PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15] -> 
PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] -> 
PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24] -> 
PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18] -> 
PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36] -> 
PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis)
kafka.common.NotLeaderForPartitionException: Leader not local for partition 
[retail.d.ris.spider.request,1] on broker 2141642
at 
kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:296)
at 
kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:77)
at 
kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:72)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72)
at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
at 
kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:202)
at 
kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:372)
at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294)
at 
kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:243)
at 
kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
at 
kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at 
kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:849)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:467)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
at kafka.server.KafkaApis

Re: client use high cpu which caused by delayedFetch operation immediately return

2016-10-18 Thread Json Tu
Thanks. I patch it, and everything goes ok.
> 在 2016年10月9日,下午12:39,Becket Qin  写道:
> 
> Can you check if you have KAFKA-3003 when you run the code?
> 
> On Sat, Oct 8, 2016 at 12:52 AM, Kafka  wrote:
> 
>> Hi all,
>>we found our consumer have high cpu load in our product
>> enviroment,as we know,fetch.min.bytes and fetch.wait.ma <
>> http://fetch.wait.ma/>x.ms will affect the frequency of consumer’s return,
>> so we adjust them to very big so that broker is very hard to satisfy it.
>>then we found the problem is not be solved,then we check the
>> kafka’s code,we check delayedFetch’s tryComplete() function has these codes,
>> 
>> if (endOffset.messageOffset != fetchOffset.messageOffset) {
>>  if (endOffset.onOlderSegment(fetchOffset)) {
>>// Case C, this can happen when the new fetch operation is
>> on a truncated leader
>>debug("Satisfying fetch %s since it is fetching later
>> segments of partition %s.".format(fetchMetadata, topicAndPartition))
>>return forceComplete()
>>  } else if (fetchOffset.onOlderSegment(endOffset)) {
>>// Case C, this can happen when the fetch operation is
>> falling behind the current segment
>>// or the partition has just rolled a new segment
>>debug("Satisfying fetch %s immediately since it is
>> fetching older segments.".format(fetchMetadata))
>>return forceComplete()
>>  } else if (fetchOffset.messageOffset <
>> endOffset.messageOffset) {
>>// we need take the partition fetch size as upper bound
>> when accumulating the bytes
>>accumulatedSize += 
>> math.min(endOffset.positionDiff(fetchOffset),
>> fetchStatus.fetchInfo.fetchSize)
>>  }
>>}
>> 
>> so we can ensure that our fetchOffset’s segmentBaseOffset is not the same
>> as endOffset’s segmentBaseOffset,then we check our topic-partition’s
>> segment, we found the data in the segment is all cleaned by the kafka for
>> log.retention.
>> and we guess that the  fetchOffset’s segmentBaseOffset is smaller than
>> endOffset’s segmentBaseOffset leads this problem.
>> 
>> but my point is should we use we use these code to make client use less
>> cpu,
>>   if (endOffset.messageOffset != fetchOffset.messageOffset) {
>>  if (endOffset.onOlderSegment(fetchOffset)) {
>>return false
>>  } else if (fetchOffset.onOlderSegment(endOffset)) {
>>return false
>>  }
>>}
>> 
>> and then it will response after fetch.wait.ma x.ms
>> in this scene instead of immediately return.
>> 
>> Feedback is greatly appreciated. Thanks.
>> 
>> 
>> 
>> 




Lots of warn log in Kafka broker

2016-10-18 Thread Json Tu
Hi all,
I have a kafka 0.9.0.0 cluster with 11 nodes.
First,I found server logs as below,
server.log.2016-10-17-22:[2016-10-17 22:22:13,885] WARN 
[ReplicaFetcherThread-0-4], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@367c9f98. Possible cause: 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading array of size 1786735, only 2389 bytes available 
(kafka.server.ReplicaFetcherThread)
server.log.2016-10-17-22:[2016-10-17 22:22:15,456] WARN 
[ReplicaFetcherThread-0-5], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@12088f91. Possible cause: 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading array of size 1338722, only 5662 bytes available 
(kafka.server.ReplicaFetcherThread)
server.log.2016-10-17-22:[2016-10-17 22:22:15,888] WARN 
[ReplicaFetcherThread-0-4], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@60069db2. Possible cause: 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading array of size 1786735, only 2389 bytes available 
(kafka.server.ReplicaFetcherThread)
server.log.2016-10-17-22:[2016-10-17 22:22:17,460] WARN 
[ReplicaFetcherThread-0-5], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@4a5991cb. Possible cause: 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading array of size 1338722, only 5662 bytes available 
(kafka.server.ReplicaFetcherThread)

Then I jstack pid,and I see
"ReplicaFetcherThread-0-3" prio=10 tid=0x7f1254319800 nid=0xfdb runnable 
[0x7f0ee36d7000]
"ReplicaFetcherThread-0-8" prio=10 tid=0x7f1278141800 nid=0x66f runnable 
[0x7f0ee2ecf000]
"ReplicaFetcherThread-0-9" prio=10 tid=0x7f1278127000 nid=0x66e runnable 
[0x7f0ee2fd]
"ReplicaFetcherThread-0-4" prio=10 tid=0x7f127810c800 nid=0x66d waiting on 
condition [0x7f0ee30d1000]
"ReplicaFetcherThread-0-1" prio=10 tid=0x7f12780ef800 nid=0x66c runnable 
[0x7f0ee31d2000]
"ReplicaFetcherThread-0-7" prio=10 tid=0x7f12780d4800 nid=0x66b runnable 
[0x7f0ee32d3000]
"ReplicaFetcherThread-0-5" prio=10 tid=0x7f12780b9800 nid=0x66a waiting on 
condition [0x7f0ee33d4000]
"ReplicaFetcherThread-0-6" prio=10 tid=0x7f127809f000 nid=0x669 runnable 
[0x7f0ee34d5000]
"ReplicaFetcherThread-0-2" prio=10 tid=0x7f1278084800 nid=0x668 runnable 
[0x7f0ee35d6000]
"ReplicaFetcherThread-0-10" prio=10 tid=0x7f127804c800 nid=0x666 runnable 
[0x7f0ee37d8000]

 the log shows that there are 2 replicaFetcherThreads waiting on condition.
 my cluster have no broker version compatible problem. From log, I thought 
there are some exceptions in broker 4 and broker 5, so I restart then, and 
everything goes right.

 what does this log means and how can it occurs?
 
 Will appreciate if anyone has any insight on what's happening here.
 Thanks.