Hi, I’m performing a producing load test on two node kafka cluster built from the last 0.8.1 branch sources. I have topic loadtest with replication factor 2 and 256 partitions. Initially both brokers are in ISR and leadership is balanced. When in the middle of the load test one broker was restarted (wasn’t able to go with controlled shutdown in specified time and was killed), I started receiving following errors which as far as I understand coming from replication:
On restarted broker 2014-04-18 16:15:02,214 ERROR [ReplicaFetcherThread-5-2] kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-5-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 52890; ClientId: ReplicaFetcherThread-5-2; ReplicaId: 1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [loadtest2,71] -> PartitionFetchInfo(0,104857600),[loadtest,85] -> PartitionFetchInfo(113676000,104857600),[loadtest,189] -> PartitionFetchInfo(112277000,104857600),[loadtest,21] -> PartitionFetchInfo(0,104857600),[loadtest,205] -> PartitionFetchInfo(112986000,104857600),[loadtest,141] -> PartitionFetchInfo(0,104857600),[loadtest,253] -> PartitionFetchInfo(0,104857600),[loadtest,77] -> PartitionFetchInfo(0,104857600),[loadtest,61] -> PartitionFetchInfo(112490000,104857600),[loadtest,229] -> PartitionFetchInfo(112805000,104857600),[loadtest,133] -> PartitionFetchInfo(0,104857600),[loadtest2,15] -> PartitionFetchInfo(0,104857600),[loadtest2,63] -> PartitionFetchInfo(0,104857600),[loadtest,181] -> PartitionFetchInfo(0,104857600),[loadtest,5] -> PartitionFetchInfo(112530000,104857600),[loadtest,29] -> PartitionFetchInfo(0,104857600),[loadtest,45] -> PartitionFetchInfo(113113000,104857600),[loadtest2,39] -> PartitionFetchInfo(0,104857600),[loadtest,37] -> PartitionFetchInfo(112145000,104857600),[loadtest,13] -> PartitionFetchInfo(112915000,104857600),[loadtest,237] -> PartitionFetchInfo(112896000,104857600),[loadtest,149] -> PartitionFetchInfo(113232000,104857600),[loadtest,117] -> PartitionFetchInfo(113100000,104857600),[loadtest,157] -> PartitionFetchInfo(0,104857600),[loadtest,165] -> PartitionFetchInfo(0,104857600),[loadtest,101] -> PartitionFetchInfo(0,104857600),[loadtest,93] -> PartitionFetchInfo(113025000,104857600),[loadtest,125] -> PartitionFetchInfo(112896000,104857600),[loadtest,197] -> PartitionFetchInfo(0,104857600),[loadtest,109] -> PartitionFetchInfo(0,104857600),[loadtest,245] -> PartitionFetchInfo(0,104857600),[loadtest,213] -> PartitionFetchInfo(0,104857600),[loadtest,53] -> PartitionFetchInfo(0,104857600),[loadtest,173] -> PartitionFetchInfo(112757000,104857600),[loadtest,69] -> PartitionFetchInfo(112378000,104857600),[loadtest,221] -> PartitionFetchInfo(0,104857600) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 2014-04-18 16:15:02,215 WARN [ReplicaFetcherThread-1-2] kafka.consumer.SimpleConsumer - Reconnect due to socket error: null On current leader 2014-04-18 16:15:10,235 ERROR [kafka-processor-9092-1] kafka.network.Processor - Closing socket for /10.41.133.59 because of error kafka.common.KafkaException: This operation cannot be completed on a complete request. at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) at kafka.network.Processor.write(SocketServer.scala:375) at kafka.network.Processor.run(SocketServer.scala:247) at java.lang.Thread.run(Thread.java:744) These errors are constantly bubbling up in logs and restarted broker never made it back to ISR even when the load test was stopped. Kafka configuration: host.name = #{IP_ADDR} port = 9092 socket.send.buffer.bytes = 1048576 socket.receive.buffer.bytes = 1048576 socket.request.max.bytes = 104857600 num.io.threads = 24 queued.max.requests = 1024 fetch.purgatory.purge.interval.requests = 1024 producer.purgatory.purge.interval.requests = 1024 broker.id = #{BROKER_ID} log.flush.interval.messages = 100000 log.flush.scheduler.interval.ms = 1000 log.flush.interval.ms = 2000 log.dirs = \ /mnt1/tmp/kafka-logs,/mnt2/tmp/kafka-logs,/mnt3/tmp/kafka-logs,/mnt4/tmp/kafka-logs,\ /mnt5/tmp/kafka-logs,/mnt6/tmp/kafka-logs,/mnt7/tmp/kafka-logs,/mnt8/tmp/kafka-logs,\ /mnt9/tmp/kafka-logs,/mnt10/tmp/kafka-logs,/mnt11/tmp/kafka-logs,/mnt12/tmp/kafka-logs,\ /mnt13/tmp/kafka-logs,/mnt14/tmp/kafka-logs,/mnt15/tmp/kafka-logs,/mnt16/tmp/kafka-logs,\ /mnt17/tmp/kafka-logs,/mnt18/tmp/kafka-logs,/mnt19/tmp/kafka-logs,/mnt20/tmp/kafka-logs,\ /mnt21/tmp/kafka-logs,/mnt22/tmp/kafka-logs,/mnt23/tmp/kafka-logs,/mnt24/tmp/kafka-logs log.segment.bytes = 1000000000 log.roll.hours = 1 log.retention.minutes = 10080 log.retention.check.interval.ms = 300000 log.cleanup.policy = delete log.index.size.max.bytes = 10485760 num.partitions = 256 auto.create.topics.enable = false default.replication.factor = 2 replica.lag.time.max.ms = 15000 replica.lag.max.messages = 750000 num.replica.fetchers = 8 replica.socket.timeout.ms = 30000 replica.socket.receive.buffer.bytes = 1048576 replica.fetch.max.bytes = 104857600 replica.fetch.wait.max.ms = 1000 replica.fetch.min.bytes = 1 replica.high.watermark.checkpoint.interval.ms = 5000 controlled.shutdown.enable = true controlled.shutdown.max.retries = 1 controlled.shutdown.retry.backoff.ms = 300000 auto.leader.rebalance.enable = true leader.imbalance.per.broker.percentage = 10 leader.imbalance.check.interval.seconds = 300 zookeeper.connect = #{ZK_PATH} zookeeper.session.timeout.ms = 30000 zookeeper.connection.timeout.ms = 30000 Why this might happen? Thanks, Alex