Hi Jun,

Setting to -1, may solve this issue.
But it will cause producer buffer full in load test resulting to failures
and drop of messages from client(producer side)
Hence, this will not actually solve the problem.

I need to fix this from kafka broker side, so that there is no impact on
producer or consumer.

>From the logs looks like there is connection problem during between brokers
and kafka broker is loosing records during this process.

But why is kafka broker loosing records,

I feel this is a BUG in kafka.

[2016-08-17 12:54:50,293] TRACE [Controller 2]: checking need to trigger
partition rebalance (kafka.controller.KafkaController)
[2016-08-17 12:54:50,294] DEBUG [Controller 2]: preferred replicas by
broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
[topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
[topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
[topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
[topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
[topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
[topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
[topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
[topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
[topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
-> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
[topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
-> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
[topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
-> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
[topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
[topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
[topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
-> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
[topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
[topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
[topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
List(4, 2))) (kafka.controller.KafkaController)
[2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for
broker 0 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for
broker 5 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for
broker 1 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for
broker 2 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for
broker 3 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for
broker 4 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:55:32,783] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:55:32,894] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,36],
[topic1,30], [topic1,31], [topic1,86], [topic1,78], [topic1,74],
[topic1,82], [topic1,33]) (kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:55:32,896] WARN [Controller-2-to-broker-2-send-thread],
Controller 2 epoch 2 fails to send request {controller_id=2,controller_
epoch=2,partition_states=[{topic=topic1,partition=82,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=30,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=78,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=12,replicas=[5,3]},{topic=topic1,partition=86,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=31,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=20,replicas=[3,5]},{topic=topic1,partition=36,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=74,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=33,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]}],live_brokers=[{id=5,end_
points=[{port=9092,host=b5.kafka,security_protocol_type=
0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_
protocol_type=0}]},{id=2,end_points=[{port=9092,host=b2.
kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_
points=[{port=9092,host=b4.kafka,security_protocol_type=
0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_protocol_type=0}]}]}
to broker Node(2, b2.kafka, 9092). Reconnecting to broker.
(kafka.controller.RequestSendThread)
java.io.IOException: Connection to 2 was disconnected before the response
was read
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:87)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:84)
        at scala.Option.foreach(Option.scala:257)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
        at kafka.utils.NetworkClientBlockingOps$.recurse$1(
NetworkClientBlockingOps.scala:129)
        at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.
scala:139)
        at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
extension(NetworkClientBlockingOps.scala:80)
        at kafka.controller.RequestSendThread.liftedTree1$
1(ControllerChannelManager.scala:180)
        at kafka.controller.RequestSendThread.doWork(
ControllerChannelManager.scala:171)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-08-17 12:55:32,897] WARN [Controller-2-to-broker-5-send-thread],
Controller 2 epoch 2 fails to send request {controller_id=2,controller_
epoch=2,partition_states=[{topic=topic1,partition=82,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=30,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=78,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=12,replicas=[5,3]},{topic=topic1,partition=86,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=31,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=20,replicas=[3,5]},{topic=topic1,partition=36,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=74,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=33,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]}],live_brokers=[{id=0,end_
points=[{port=9092,host=b0.kafka,security_protocol_type=
0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_
protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3.
kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
9092,host=b1.kafka,security_protocol_type=0}]},{id=2,end_
points=[{port=9092,host=b2.kafka,security_protocol_type=
0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_protocol_type=0}]}]}
to broker Node(5, b5.kafka, 9092). Reconnecting to broker.
(kafka.controller.RequestSendThread)
java.io.IOException: Connection to 5 was disconnected before the response
was read
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:87)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:84)
        at scala.Option.foreach(Option.scala:257)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
        at kafka.utils.NetworkClientBlockingOps$.recurse$1(
NetworkClientBlockingOps.scala:129)
        at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.
scala:139)
        at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
extension(NetworkClientBlockingOps.scala:80)
        at kafka.controller.RequestSendThread.liftedTree1$
1(ControllerChannelManager.scala:180)
        at kafka.controller.RequestSendThread.doWork(
ControllerChannelManager.scala:171)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-08-17 12:55:32,898] WARN [Controller-2-to-broker-4-send-thread],
Controller 2 epoch 2 fails to send request {controller_id=2,controller_
epoch=2,partition_states=[{topic=topic1,partition=82,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=30,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=78,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=12,replicas=[5,3]},{topic=topic1,partition=86,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=31,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=20,replicas=[3,5]},{topic=topic1,partition=36,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=74,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=33,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]}],live_brokers=[{id=3,end_
points=[{port=9092,host=b3.kafka,security_protocol_type=
0}]},{id=1,end_points=[{port=9092,host=b1.kafka,security_
protocol_type=0}]},{id=4,end_points=[{port=9092,host=b4.
kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
9092,host=b2.kafka,security_protocol_type=0}]},{id=0,end_
points=[{port=9092,host=b0.kafka,security_protocol_type=
0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_protocol_type=0}]}]}
to broker Node(4, b4.kafka, 9092). Reconnecting to broker.
(kafka.controller.RequestSendThread)
java.io.IOException: Connection to 4 was disconnected before the response
was read
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:87)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:84)
        at scala.Option.foreach(Option.scala:257)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
        at kafka.utils.NetworkClientBlockingOps$.recurse$1(
NetworkClientBlockingOps.scala:129)
        at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.
scala:139)
        at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
extension(NetworkClientBlockingOps.scala:80)
        at kafka.controller.RequestSendThread.liftedTree1$
1(ControllerChannelManager.scala:180)
        at kafka.controller.RequestSendThread.doWork(
ControllerChannelManager.scala:171)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-08-17 12:55:32,900] WARN [Controller-2-to-broker-1-send-thread],
Controller 2 epoch 2 fails to send request {controller_id=2,controller_
epoch=2,partition_states=[{topic=topic1,partition=82,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=30,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=78,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=12,replicas=[5,3]},{topic=topic1,partition=86,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=31,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=20,replicas=[3,5]},{topic=topic1,partition=36,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=74,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=33,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]}],live_brokers=[{id=5,end_
points=[{port=9092,host=b5.kafka,security_protocol_type=
0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3.
kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_
points=[{port=9092,host=b4.kafka,security_protocol_type=
0}]},{id=2,end_points=[{port=9092,host=b2.kafka,security_protocol_type=0}]}]}
to broker Node(1, b1.kafka, 9092). Reconnecting to broker.
(kafka.controller.RequestSendThread)
java.io.IOException: Connection to 1 was disconnected before the response
was read
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:87)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:84)
        at scala.Option.foreach(Option.scala:257)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
        at kafka.utils.NetworkClientBlockingOps$.recurse$1(
NetworkClientBlockingOps.scala:129)
        at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.
scala:139)
        at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
extension(NetworkClientBlockingOps.scala:80)
        at kafka.controller.RequestSendThread.liftedTree1$
1(ControllerChannelManager.scala:180)
        at kafka.controller.RequestSendThread.doWork(
ControllerChannelManager.scala:171)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-08-17 12:55:32,902] WARN [Controller-2-to-broker-3-send-thread],
Controller 2 epoch 2 fails to send request {controller_id=2,controller_
epoch=2,partition_states=[{topic=topic1,partition=82,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=30,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=78,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=12,replicas=[5,3]},{topic=topic1,partition=86,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=31,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=20,replicas=[3,5]},{topic=topic1,partition=36,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=74,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=33,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]}],live_brokers=[{id=3,end_
points=[{port=9092,host=b3.kafka,security_protocol_type=
0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
protocol_type=0}]},{id=5,end_points=[{port=9092,host=b5.
kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
9092,host=b2.kafka,security_protocol_type=0}]},{id=1,end_
points=[{port=9092,host=b1.kafka,security_protocol_type=
0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_protocol_type=0}]}]}
to broker Node(3, b3.kafka, 9092). Reconnecting to broker.
(kafka.controller.RequestSendThread)
java.io.IOException: Connection to 3 was disconnected before the response
was read
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:87)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:84)
        at scala.Option.foreach(Option.scala:257)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
        at kafka.utils.NetworkClientBlockingOps$.recurse$1(
NetworkClientBlockingOps.scala:129)
        at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.
scala:139)
        at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
extension(NetworkClientBlockingOps.scala:80)
        at kafka.controller.RequestSendThread.liftedTree1$
1(ControllerChannelManager.scala:180)
        at kafka.controller.RequestSendThread.doWork(
ControllerChannelManager.scala:171)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-08-17 12:55:32,903] WARN [Controller-2-to-broker-0-send-thread],
Controller 2 epoch 2 fails to send request {controller_id=2,controller_
epoch=2,partition_states=[{topic=topic1,partition=82,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=30,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=78,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=12,replicas=[5,3]},{topic=topic1,partition=86,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=31,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=20,replicas=[3,5]},{topic=topic1,partition=36,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]},{topic=topic1,partition=74,
controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
version=14,replicas=[5,3]},{topic=topic1,partition=33,
controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
version=18,replicas=[3,5]}],live_brokers=[{id=4,end_
points=[{port=9092,host=b4.kafka,security_protocol_type=
0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_
protocol_type=0}]},{id=1,end_points=[{port=9092,host=b1.
kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
9092,host=b2.kafka,security_protocol_type=0}]},{id=5,end_
points=[{port=9092,host=b5.kafka,security_protocol_type=
0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_protocol_type=0}]}]}
to broker Node(0, b0.kafka, 9092). Reconnecting to broker.
(kafka.controller.RequestSendThread)
java.io.IOException: Connection to 0 was disconnected before the response
was read
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:87)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
NetworkClientBlockingOps.scala:84)
        at scala.Option.foreach(Option.scala:257)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$
blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
        at kafka.utils.NetworkClientBlockingOps$.recurse$1(
NetworkClientBlockingOps.scala:129)
        at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.
scala:139)
        at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
extension(NetworkClientBlockingOps.scala:80)
        at kafka.controller.RequestSendThread.liftedTree1$
1(ControllerChannelManager.scala:180)
        at kafka.controller.RequestSendThread.doWork(
ControllerChannelManager.scala:171)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-08-17 12:55:32,927] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:55:33,162] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:55:33,169] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,50])
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:55:33,194] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:55:33,198] INFO [Controller-2-to-broker-2-send-thread],
Controller 2 connected to Node(2, b2.kafka, 9092) for sending state change
requests (kafka.controller.RequestSendThread)
[2016-08-17 12:55:33,199] INFO [Controller-2-to-broker-5-send-thread],
Controller 2 connected to Node(5, b5.kafka, 9092) for sending state change
requests (kafka.controller.RequestSendThread)
[2016-08-17 12:55:33,200] INFO [Controller-2-to-broker-4-send-thread],
Controller 2 connected to Node(4, b4.kafka, 9092) for sending state change
requests (kafka.controller.RequestSendThread)
[2016-08-17 12:55:33,202] INFO [Controller-2-to-broker-1-send-thread],
Controller 2 connected to Node(1, b1.kafka, 9092) for sending state change
requests (kafka.controller.RequestSendThread)
[2016-08-17 12:55:33,204] INFO [Controller-2-to-broker-0-send-thread],
Controller 2 connected to Node(0, b0.kafka, 9092) for sending state change
requests (kafka.controller.RequestSendThread)
[2016-08-17 12:55:33,207] INFO [Controller-2-to-broker-3-send-thread],
Controller 2 connected to Node(3, b3.kafka, 9092) for sending state change
requests (kafka.controller.RequestSendThread)
[2016-08-17 12:55:39,981] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:55:40,018] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,34],
[topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
[topic1,35], [topic1,33]) (kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:55:40,377] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:55:40,388] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,86],
[topic1,78], [topic1,82]) (kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:55:40,409] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 12:59:50,293] TRACE [Controller 2]: checking need to trigger
partition rebalance (kafka.controller.KafkaController)
[2016-08-17 12:59:50,294] DEBUG [Controller 2]: preferred replicas by
broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
[topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
[topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
[topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
[topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
[topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
[topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
[topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
[topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
[topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
-> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
[topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
-> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
[topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
-> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
[topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
[topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
[topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
-> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
[topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
[topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
[topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
List(4, 2))) (kafka.controller.KafkaController)
[2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
broker 0 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
broker 5 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
broker 1 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
broker 2 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance ratio for
broker 3 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance ratio for
broker 4 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:00:39,546] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:39,604] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,5])
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:39,649] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:39,888] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:40,071] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,37],
[topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
[topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
[topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:40,103] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:40,261] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:40,283] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,72],
[topic1,80]) (kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:40,296] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:40,656] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:40,662] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,55])
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:40,934] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:47,335] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:47,393] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,37],
[topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
[topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
[topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:47,423] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:47,897] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:00:47,944] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,5],
[topic1,3], [topic1,7], [topic1,11], [topic1,2], [topic1,6], [topic1,1],
[topic1,10], [topic1,14], [topic1,9], [topic1,15]) (kafka.controller.
IsrChangeNotificationListener)
[2016-08-17 13:00:48,020] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:04:50,293] TRACE [Controller 2]: checking need to trigger
partition rebalance (kafka.controller.KafkaController)
[2016-08-17 13:04:50,295] DEBUG [Controller 2]: preferred replicas by
broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
[topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
[topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
[topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
[topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
[topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
[topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
[topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
[topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
[topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
-> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
[topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
-> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
[topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
-> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
[topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
[topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
[topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
-> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
[topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
[topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
[topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
List(4, 2))) (kafka.controller.KafkaController)
[2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:04:50,295] TRACE [Controller 2]: leader imbalance ratio for
broker 0 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
broker 5 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
broker 1 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
broker 2 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
broker 3 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
broker 4 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:05:34,317] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:34,365] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,80],
[topic1,40], [topic1,21], [topic1,31], [topic1,84], [topic1,33])
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:34,388] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:36,426] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:36,437] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,92])
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:36,699] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:40,225] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:40,239] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,80],
[topic1,84]) (kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:40,246] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:40,958] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:41,006] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,22],
[topic1,16], [topic1,20], [topic1,19], [topic1,40], [topic1,21],
[topic1,18], [topic1,47], [topic1,44], [topic1,45], [topic1,42],
[topic1,46], [topic1,43], [topic1,23]) (kafka.controller.
IsrChangeNotificationListener)
[2016-08-17 13:05:41,067] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:42,517] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:05:42,622] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,37],
[topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
[topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
[topic1,26], [topic1,35], [topic1,33], [topic1,28]) (kafka.controller.
IsrChangeNotificationListener)
[2016-08-17 13:05:42,690] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:09:50,293] TRACE [Controller 2]: checking need to trigger
partition rebalance (kafka.controller.KafkaController)
[2016-08-17 13:09:50,295] DEBUG [Controller 2]: preferred replicas by
broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
[topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
[topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
[topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
[topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
[topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
[topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
[topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
[topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
[topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
-> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
[topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
-> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
[topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
-> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
[topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
[topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
[topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
-> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
[topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
[topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
[topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
List(4, 2))) (kafka.controller.KafkaController)
[2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:09:50,295] TRACE [Controller 2]: leader imbalance ratio for
broker 0 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
broker 5 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
broker 1 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
broker 2 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
broker 3 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
replica Map() (kafka.controller.KafkaController)
[2016-08-17 13:09:50,297] TRACE [Controller 2]: leader imbalance ratio for
broker 4 is 0.000000 (kafka.controller.KafkaController)
[2016-08-17 13:10:37,278] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:10:37,292] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,67],
[topic1,95]) (kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:10:37,304] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:10:43,375] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:10:43,383] DEBUG Sending MetadataRequest to
Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,67],
[topic1,95]) (kafka.controller.IsrChangeNotificationListener)
[2016-08-17 13:10:43,394] DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)

Thanks

Regards,
Mazhar Shaikh.



On Wed, Aug 17, 2016 at 9:50 PM, Jun Rao <j...@confluent.io> wrote:

> Yes, you can try setting it to -1 in 0.8.1, which is the equivalent of
> "all" in 0.9 and above.
>
> Thanks,
>
> Jun
>
> On Wed, Aug 17, 2016 at 8:32 AM, Mazhar Shaikh <mazhar.shaikh...@gmail.com
> >
> wrote:
>
> > Hi Jun,
> >
> > I'm using default configuration (ack=1),
> > changing it t0 all or 2 will not help, as the producer queue will be
> > exhausted is any kafka broker goes down for long time.
> >
> >
> > Thanks.
> >
> > Regards,
> > Mazhar Shaikh.
> >
> >
> > On Wed, Aug 17, 2016 at 8:11 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Are you using acks=1 or acks=all in the producer? Only the latter
> > > guarantees acked messages won't be lost after leader failure.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Aug 10, 2016 at 11:41 PM, Mazhar Shaikh <
> > > mazhar.shaikh...@gmail.com>
> > > wrote:
> > >
> > > > Hi Kafka Team,
> > > >
> > > > I'm using kafka (kafka_2.11-0.9.0.1) with librdkafka (0.8.1) API for
> > > > producer
> > > > During a run of 2hrs, I notice the total number of messaged ack'd by
> > > > librdkafka delivery report is greater than the maxoffset of a
> partition
> > > in
> > > > kafka broker.
> > > > I'm running kafka broker with replication factor of 2.
> > > >
> > > > Here, message has been lost between librdkafka - kafka broker.
> > > >
> > > > As librdkafka is providing success delivery report for all the
> > messages.
> > > >
> > > > Looks like kafka broker is dropping the messages after acknowledging
> > > > librdkafka.
> > > >
> > > > Requesting you help in solving this issue.
> > > >
> > > > Thank you.
> > > >
> > > >
> > > > Regards
> > > > Mazhar Shaikh
> > > >
> > >
> >
>

Reply via email to