Hi Jun, Setting to -1, may solve this issue. But it will cause producer buffer full in load test resulting to failures and drop of messages from client(producer side) Hence, this will not actually solve the problem.
I need to fix this from kafka broker side, so that there is no impact on producer or consumer. >From the logs looks like there is connection problem during between brokers and kafka broker is loosing records during this process. But why is kafka broker loosing records, I feel this is a BUG in kafka. [2016-08-17 12:54:50,293] TRACE [Controller 2]: checking need to trigger partition rebalance (kafka.controller.KafkaController) [2016-08-17 12:54:50,294] DEBUG [Controller 2]: preferred replicas by broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1), [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] -> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1), [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] -> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1), [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] -> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3), [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] -> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3), [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] -> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3), [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] -> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 -> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] -> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0), [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] -> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0), [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] -> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0), [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3] -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4), [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2, 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] -> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4] -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4), [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30] -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5), [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] -> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5), [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] -> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5), [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53] -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2), [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] -> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2), [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] -> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2), [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] -> List(4, 2))) (kafka.controller.KafkaController) [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for broker 0 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for broker 5 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for broker 1 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for broker 2 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for broker 3 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for broker 4 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:55:32,783] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:55:32,894] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,36], [topic1,30], [topic1,31], [topic1,86], [topic1,78], [topic1,74], [topic1,82], [topic1,33]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:55:32,896] WARN [Controller-2-to-broker-2-send-thread], Controller 2 epoch 2 fails to send request {controller_id=2,controller_ epoch=2,partition_states=[{topic=topic1,partition=82, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=30, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=78, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=12,replicas=[5,3]},{topic=topic1,partition=86, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=31, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=20,replicas=[3,5]},{topic=topic1,partition=36, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=74, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=33, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]}],live_brokers=[{id=5,end_ points=[{port=9092,host=b5.kafka,security_protocol_type= 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_ protocol_type=0}]},{id=2,end_points=[{port=9092,host=b2. kafka,security_protocol_type=0}]},{id=1,end_points=[{port= 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_ points=[{port=9092,host=b4.kafka,security_protocol_type= 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_protocol_type=0}]}]} to broker Node(2, b2.kafka, 9092). Reconnecting to broker. (kafka.controller.RequestSendThread) java.io.IOException: Connection to 2 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recurse$1( NetworkClientBlockingOps.scala:129) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps. scala:139) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$ extension(NetworkClientBlockingOps.scala:80) at kafka.controller.RequestSendThread.liftedTree1$ 1(ControllerChannelManager.scala:180) at kafka.controller.RequestSendThread.doWork( ControllerChannelManager.scala:171) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-08-17 12:55:32,897] WARN [Controller-2-to-broker-5-send-thread], Controller 2 epoch 2 fails to send request {controller_id=2,controller_ epoch=2,partition_states=[{topic=topic1,partition=82, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=30, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=78, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=12,replicas=[5,3]},{topic=topic1,partition=86, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=31, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=20,replicas=[3,5]},{topic=topic1,partition=36, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=74, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=33, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]}],live_brokers=[{id=0,end_ points=[{port=9092,host=b0.kafka,security_protocol_type= 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_ protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3. kafka,security_protocol_type=0}]},{id=1,end_points=[{port= 9092,host=b1.kafka,security_protocol_type=0}]},{id=2,end_ points=[{port=9092,host=b2.kafka,security_protocol_type= 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_protocol_type=0}]}]} to broker Node(5, b5.kafka, 9092). Reconnecting to broker. (kafka.controller.RequestSendThread) java.io.IOException: Connection to 5 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recurse$1( NetworkClientBlockingOps.scala:129) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps. scala:139) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$ extension(NetworkClientBlockingOps.scala:80) at kafka.controller.RequestSendThread.liftedTree1$ 1(ControllerChannelManager.scala:180) at kafka.controller.RequestSendThread.doWork( ControllerChannelManager.scala:171) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-08-17 12:55:32,898] WARN [Controller-2-to-broker-4-send-thread], Controller 2 epoch 2 fails to send request {controller_id=2,controller_ epoch=2,partition_states=[{topic=topic1,partition=82, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=30, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=78, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=12,replicas=[5,3]},{topic=topic1,partition=86, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=31, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=20,replicas=[3,5]},{topic=topic1,partition=36, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=74, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=33, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]}],live_brokers=[{id=3,end_ points=[{port=9092,host=b3.kafka,security_protocol_type= 0}]},{id=1,end_points=[{port=9092,host=b1.kafka,security_ protocol_type=0}]},{id=4,end_points=[{port=9092,host=b4. kafka,security_protocol_type=0}]},{id=2,end_points=[{port= 9092,host=b2.kafka,security_protocol_type=0}]},{id=0,end_ points=[{port=9092,host=b0.kafka,security_protocol_type= 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_protocol_type=0}]}]} to broker Node(4, b4.kafka, 9092). Reconnecting to broker. (kafka.controller.RequestSendThread) java.io.IOException: Connection to 4 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recurse$1( NetworkClientBlockingOps.scala:129) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps. scala:139) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$ extension(NetworkClientBlockingOps.scala:80) at kafka.controller.RequestSendThread.liftedTree1$ 1(ControllerChannelManager.scala:180) at kafka.controller.RequestSendThread.doWork( ControllerChannelManager.scala:171) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-08-17 12:55:32,900] WARN [Controller-2-to-broker-1-send-thread], Controller 2 epoch 2 fails to send request {controller_id=2,controller_ epoch=2,partition_states=[{topic=topic1,partition=82, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=30, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=78, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=12,replicas=[5,3]},{topic=topic1,partition=86, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=31, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=20,replicas=[3,5]},{topic=topic1,partition=36, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=74, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=33, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]}],live_brokers=[{id=5,end_ points=[{port=9092,host=b5.kafka,security_protocol_type= 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_ protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3. kafka,security_protocol_type=0}]},{id=1,end_points=[{port= 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_ points=[{port=9092,host=b4.kafka,security_protocol_type= 0}]},{id=2,end_points=[{port=9092,host=b2.kafka,security_protocol_type=0}]}]} to broker Node(1, b1.kafka, 9092). Reconnecting to broker. (kafka.controller.RequestSendThread) java.io.IOException: Connection to 1 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recurse$1( NetworkClientBlockingOps.scala:129) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps. scala:139) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$ extension(NetworkClientBlockingOps.scala:80) at kafka.controller.RequestSendThread.liftedTree1$ 1(ControllerChannelManager.scala:180) at kafka.controller.RequestSendThread.doWork( ControllerChannelManager.scala:171) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-08-17 12:55:32,902] WARN [Controller-2-to-broker-3-send-thread], Controller 2 epoch 2 fails to send request {controller_id=2,controller_ epoch=2,partition_states=[{topic=topic1,partition=82, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=30, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=78, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=12,replicas=[5,3]},{topic=topic1,partition=86, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=31, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=20,replicas=[3,5]},{topic=topic1,partition=36, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=74, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=33, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]}],live_brokers=[{id=3,end_ points=[{port=9092,host=b3.kafka,security_protocol_type= 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_ protocol_type=0}]},{id=5,end_points=[{port=9092,host=b5. kafka,security_protocol_type=0}]},{id=2,end_points=[{port= 9092,host=b2.kafka,security_protocol_type=0}]},{id=1,end_ points=[{port=9092,host=b1.kafka,security_protocol_type= 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_protocol_type=0}]}]} to broker Node(3, b3.kafka, 9092). Reconnecting to broker. (kafka.controller.RequestSendThread) java.io.IOException: Connection to 3 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recurse$1( NetworkClientBlockingOps.scala:129) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps. scala:139) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$ extension(NetworkClientBlockingOps.scala:80) at kafka.controller.RequestSendThread.liftedTree1$ 1(ControllerChannelManager.scala:180) at kafka.controller.RequestSendThread.doWork( ControllerChannelManager.scala:171) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-08-17 12:55:32,903] WARN [Controller-2-to-broker-0-send-thread], Controller 2 epoch 2 fails to send request {controller_id=2,controller_ epoch=2,partition_states=[{topic=topic1,partition=82, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=30, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=78, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=12,replicas=[5,3]},{topic=topic1,partition=86, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=31, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=20,replicas=[3,5]},{topic=topic1,partition=36, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]},{topic=topic1,partition=74, controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ version=14,replicas=[5,3]},{topic=topic1,partition=33, controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ version=18,replicas=[3,5]}],live_brokers=[{id=4,end_ points=[{port=9092,host=b4.kafka,security_protocol_type= 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_ protocol_type=0}]},{id=1,end_points=[{port=9092,host=b1. kafka,security_protocol_type=0}]},{id=2,end_points=[{port= 9092,host=b2.kafka,security_protocol_type=0}]},{id=5,end_ points=[{port=9092,host=b5.kafka,security_protocol_type= 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_protocol_type=0}]}]} to broker Node(0, b0.kafka, 9092). Reconnecting to broker. (kafka.controller.RequestSendThread) java.io.IOException: Connection to 0 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$ blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recurse$1( NetworkClientBlockingOps.scala:129) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps. scala:139) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$ extension(NetworkClientBlockingOps.scala:80) at kafka.controller.RequestSendThread.liftedTree1$ 1(ControllerChannelManager.scala:180) at kafka.controller.RequestSendThread.doWork( ControllerChannelManager.scala:171) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-08-17 12:55:32,927] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:55:33,162] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:55:33,169] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,50]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:55:33,194] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:55:33,198] INFO [Controller-2-to-broker-2-send-thread], Controller 2 connected to Node(2, b2.kafka, 9092) for sending state change requests (kafka.controller.RequestSendThread) [2016-08-17 12:55:33,199] INFO [Controller-2-to-broker-5-send-thread], Controller 2 connected to Node(5, b5.kafka, 9092) for sending state change requests (kafka.controller.RequestSendThread) [2016-08-17 12:55:33,200] INFO [Controller-2-to-broker-4-send-thread], Controller 2 connected to Node(4, b4.kafka, 9092) for sending state change requests (kafka.controller.RequestSendThread) [2016-08-17 12:55:33,202] INFO [Controller-2-to-broker-1-send-thread], Controller 2 connected to Node(1, b1.kafka, 9092) for sending state change requests (kafka.controller.RequestSendThread) [2016-08-17 12:55:33,204] INFO [Controller-2-to-broker-0-send-thread], Controller 2 connected to Node(0, b0.kafka, 9092) for sending state change requests (kafka.controller.RequestSendThread) [2016-08-17 12:55:33,207] INFO [Controller-2-to-broker-3-send-thread], Controller 2 connected to Node(3, b3.kafka, 9092) for sending state change requests (kafka.controller.RequestSendThread) [2016-08-17 12:55:39,981] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:55:40,018] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,34], [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38], [topic1,35], [topic1,33]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:55:40,377] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:55:40,388] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,86], [topic1,78], [topic1,82]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:55:40,409] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 12:59:50,293] TRACE [Controller 2]: checking need to trigger partition rebalance (kafka.controller.KafkaController) [2016-08-17 12:59:50,294] DEBUG [Controller 2]: preferred replicas by broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1), [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] -> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1), [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] -> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1), [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] -> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3), [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] -> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3), [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] -> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3), [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] -> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 -> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] -> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0), [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] -> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0), [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] -> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0), [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3] -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4), [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2, 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] -> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4] -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4), [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30] -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5), [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] -> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5), [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] -> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5), [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53] -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2), [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] -> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2), [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] -> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2), [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] -> List(4, 2))) (kafka.controller.KafkaController) [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for broker 0 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for broker 5 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for broker 1 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for broker 2 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance ratio for broker 3 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance ratio for broker 4 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:00:39,546] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:39,604] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,5]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:39,649] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:39,888] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:40,071] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,37], [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39], [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:40,103] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:40,261] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:40,283] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,72], [topic1,80]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:40,296] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:40,656] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:40,662] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,55]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:40,934] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:47,335] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:47,393] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,37], [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39], [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:47,423] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:47,897] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:00:47,944] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,5], [topic1,3], [topic1,7], [topic1,11], [topic1,2], [topic1,6], [topic1,1], [topic1,10], [topic1,14], [topic1,9], [topic1,15]) (kafka.controller. IsrChangeNotificationListener) [2016-08-17 13:00:48,020] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:04:50,293] TRACE [Controller 2]: checking need to trigger partition rebalance (kafka.controller.KafkaController) [2016-08-17 13:04:50,295] DEBUG [Controller 2]: preferred replicas by broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1), [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] -> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1), [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] -> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1), [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] -> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3), [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] -> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3), [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] -> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3), [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] -> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 -> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] -> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0), [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] -> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0), [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] -> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0), [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3] -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4), [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2, 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] -> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4] -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4), [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30] -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5), [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] -> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5), [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] -> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5), [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53] -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2), [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] -> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2), [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] -> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2), [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] -> List(4, 2))) (kafka.controller.KafkaController) [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:04:50,295] TRACE [Controller 2]: leader imbalance ratio for broker 0 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for broker 5 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for broker 1 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for broker 2 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for broker 3 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for broker 4 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:05:34,317] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:34,365] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,80], [topic1,40], [topic1,21], [topic1,31], [topic1,84], [topic1,33]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:34,388] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:36,426] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:36,437] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,92]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:36,699] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:40,225] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:40,239] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,80], [topic1,84]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:40,246] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:40,958] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:41,006] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,22], [topic1,16], [topic1,20], [topic1,19], [topic1,40], [topic1,21], [topic1,18], [topic1,47], [topic1,44], [topic1,45], [topic1,42], [topic1,46], [topic1,43], [topic1,23]) (kafka.controller. IsrChangeNotificationListener) [2016-08-17 13:05:41,067] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:42,517] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:05:42,622] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,37], [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39], [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28]) (kafka.controller. IsrChangeNotificationListener) [2016-08-17 13:05:42,690] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:09:50,293] TRACE [Controller 2]: checking need to trigger partition rebalance (kafka.controller.KafkaController) [2016-08-17 13:09:50,295] DEBUG [Controller 2]: preferred replicas by broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1), [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] -> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1), [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] -> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1), [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] -> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3), [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] -> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3), [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] -> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3), [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] -> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 -> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] -> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0), [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] -> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0), [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] -> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0), [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3] -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4), [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2, 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] -> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4] -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4), [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30] -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5), [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] -> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5), [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] -> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5), [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53] -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2), [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] -> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2), [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] -> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2), [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] -> List(4, 2))) (kafka.controller.KafkaController) [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:09:50,295] TRACE [Controller 2]: leader imbalance ratio for broker 0 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for broker 5 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for broker 1 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for broker 2 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for broker 3 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2016-08-17 13:09:50,297] TRACE [Controller 2]: leader imbalance ratio for broker 4 is 0.000000 (kafka.controller.KafkaController) [2016-08-17 13:10:37,278] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:10:37,292] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,67], [topic1,95]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:10:37,304] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:10:43,375] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:10:43,383] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,67], [topic1,95]) (kafka.controller.IsrChangeNotificationListener) [2016-08-17 13:10:43,394] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) Thanks Regards, Mazhar Shaikh. On Wed, Aug 17, 2016 at 9:50 PM, Jun Rao <j...@confluent.io> wrote: > Yes, you can try setting it to -1 in 0.8.1, which is the equivalent of > "all" in 0.9 and above. > > Thanks, > > Jun > > On Wed, Aug 17, 2016 at 8:32 AM, Mazhar Shaikh <mazhar.shaikh...@gmail.com > > > wrote: > > > Hi Jun, > > > > I'm using default configuration (ack=1), > > changing it t0 all or 2 will not help, as the producer queue will be > > exhausted is any kafka broker goes down for long time. > > > > > > Thanks. > > > > Regards, > > Mazhar Shaikh. > > > > > > On Wed, Aug 17, 2016 at 8:11 PM, Jun Rao <j...@confluent.io> wrote: > > > > > Are you using acks=1 or acks=all in the producer? Only the latter > > > guarantees acked messages won't be lost after leader failure. > > > > > > Thanks, > > > > > > Jun > > > > > > On Wed, Aug 10, 2016 at 11:41 PM, Mazhar Shaikh < > > > mazhar.shaikh...@gmail.com> > > > wrote: > > > > > > > Hi Kafka Team, > > > > > > > > I'm using kafka (kafka_2.11-0.9.0.1) with librdkafka (0.8.1) API for > > > > producer > > > > During a run of 2hrs, I notice the total number of messaged ack'd by > > > > librdkafka delivery report is greater than the maxoffset of a > partition > > > in > > > > kafka broker. > > > > I'm running kafka broker with replication factor of 2. > > > > > > > > Here, message has been lost between librdkafka - kafka broker. > > > > > > > > As librdkafka is providing success delivery report for all the > > messages. > > > > > > > > Looks like kafka broker is dropping the messages after acknowledging > > > > librdkafka. > > > > > > > > Requesting you help in solving this issue. > > > > > > > > Thank you. > > > > > > > > > > > > Regards > > > > Mazhar Shaikh > > > > > > > > > >