Hi,

I’m performing a producing load test on two node kafka cluster built from the 
last 0.8.1 branch sources. I have topic loadtest with replication factor 2 and 
256 partitions. Initially both brokers are in ISR and leadership is balanced. 
When in the middle of the load test one broker was restarted (wasn’t able to go 
with controlled shutdown in specified time and was killed), I started receiving 
following errors which as far as I understand coming from replication:


On restarted broker

2014-04-18 16:15:02,214 ERROR [ReplicaFetcherThread-5-2] 
kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-5-2], Error in fetch 
Name: FetchRequest; Version: 0; CorrelationId: 52890; ClientId: 
ReplicaFetcherThread-5-2; ReplicaId: 1; MaxWait: 1000 ms; MinBytes: 1 bytes; 
RequestInfo: [loadtest2,71] -> PartitionFetchInfo(0,104857600),[loadtest,85] -> 
PartitionFetchInfo(113676000,104857600),[loadtest,189] -> 
PartitionFetchInfo(112277000,104857600),[loadtest,21] -> 
PartitionFetchInfo(0,104857600),[loadtest,205] -> 
PartitionFetchInfo(112986000,104857600),[loadtest,141] -> 
PartitionFetchInfo(0,104857600),[loadtest,253] -> 
PartitionFetchInfo(0,104857600),[loadtest,77] -> 
PartitionFetchInfo(0,104857600),[loadtest,61] -> 
PartitionFetchInfo(112490000,104857600),[loadtest,229] -> 
PartitionFetchInfo(112805000,104857600),[loadtest,133] -> 
PartitionFetchInfo(0,104857600),[loadtest2,15] -> 
PartitionFetchInfo(0,104857600),[loadtest2,63] -> 
PartitionFetchInfo(0,104857600),[loadtest,181] -> 
PartitionFetchInfo(0,104857600),[loadtest,5] -> 
PartitionFetchInfo(112530000,104857600),[loadtest,29] -> 
PartitionFetchInfo(0,104857600),[loadtest,45] -> 
PartitionFetchInfo(113113000,104857600),[loadtest2,39] -> 
PartitionFetchInfo(0,104857600),[loadtest,37] -> 
PartitionFetchInfo(112145000,104857600),[loadtest,13] -> 
PartitionFetchInfo(112915000,104857600),[loadtest,237] -> 
PartitionFetchInfo(112896000,104857600),[loadtest,149] -> 
PartitionFetchInfo(113232000,104857600),[loadtest,117] -> 
PartitionFetchInfo(113100000,104857600),[loadtest,157] -> 
PartitionFetchInfo(0,104857600),[loadtest,165] -> 
PartitionFetchInfo(0,104857600),[loadtest,101] -> 
PartitionFetchInfo(0,104857600),[loadtest,93] -> 
PartitionFetchInfo(113025000,104857600),[loadtest,125] -> 
PartitionFetchInfo(112896000,104857600),[loadtest,197] -> 
PartitionFetchInfo(0,104857600),[loadtest,109] -> 
PartitionFetchInfo(0,104857600),[loadtest,245] -> 
PartitionFetchInfo(0,104857600),[loadtest,213] -> 
PartitionFetchInfo(0,104857600),[loadtest,53] -> 
PartitionFetchInfo(0,104857600),[loadtest,173] -> 
PartitionFetchInfo(112757000,104857600),[loadtest,69] -> 
PartitionFetchInfo(112378000,104857600),[loadtest,221] -> 
PartitionFetchInfo(0,104857600)
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
        at kafka.utils.Utils$.read(Utils.scala:376)
        at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2014-04-18 16:15:02,215 WARN  [ReplicaFetcherThread-1-2] 
kafka.consumer.SimpleConsumer - Reconnect due to socket error: null 


On current leader

2014-04-18 16:15:10,235 ERROR [kafka-processor-9092-1] kafka.network.Processor 
- Closing socket for /10.41.133.59 because of error
kafka.common.KafkaException: This operation cannot be completed on a complete 
request.
        at 
kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
        at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
        at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
        at kafka.network.Processor.write(SocketServer.scala:375)
        at kafka.network.Processor.run(SocketServer.scala:247)
        at java.lang.Thread.run(Thread.java:744)


These errors are constantly bubbling up in logs and restarted broker never made 
it back to ISR even when the load test was stopped.


Kafka configuration:

host.name                                   = #{IP_ADDR}
port                                        = 9092
socket.send.buffer.bytes                    = 1048576
socket.receive.buffer.bytes                 = 1048576
socket.request.max.bytes                    = 104857600
num.io.threads                              = 24
queued.max.requests                         = 1024
fetch.purgatory.purge.interval.requests     = 1024
producer.purgatory.purge.interval.requests  = 1024



broker.id                                   = #{BROKER_ID}

log.flush.interval.messages                 = 100000
log.flush.scheduler.interval.ms             = 1000
log.flush.interval.ms                       = 2000

log.dirs                                    = \
/mnt1/tmp/kafka-logs,/mnt2/tmp/kafka-logs,/mnt3/tmp/kafka-logs,/mnt4/tmp/kafka-logs,\
/mnt5/tmp/kafka-logs,/mnt6/tmp/kafka-logs,/mnt7/tmp/kafka-logs,/mnt8/tmp/kafka-logs,\
/mnt9/tmp/kafka-logs,/mnt10/tmp/kafka-logs,/mnt11/tmp/kafka-logs,/mnt12/tmp/kafka-logs,\
/mnt13/tmp/kafka-logs,/mnt14/tmp/kafka-logs,/mnt15/tmp/kafka-logs,/mnt16/tmp/kafka-logs,\
/mnt17/tmp/kafka-logs,/mnt18/tmp/kafka-logs,/mnt19/tmp/kafka-logs,/mnt20/tmp/kafka-logs,\
/mnt21/tmp/kafka-logs,/mnt22/tmp/kafka-logs,/mnt23/tmp/kafka-logs,/mnt24/tmp/kafka-logs

log.segment.bytes                           = 1000000000
log.roll.hours                              = 1

log.retention.minutes                       = 10080
log.retention.check.interval.ms             = 300000
log.cleanup.policy                          = delete

log.index.size.max.bytes                    = 10485760

num.partitions                              = 256
auto.create.topics.enable                   = false


default.replication.factor                      = 2
replica.lag.time.max.ms                         = 15000
replica.lag.max.messages                        = 750000
num.replica.fetchers                            = 8
replica.socket.timeout.ms                       = 30000
replica.socket.receive.buffer.bytes             = 1048576
replica.fetch.max.bytes                         = 104857600
replica.fetch.wait.max.ms                       = 1000
replica.fetch.min.bytes                         = 1
replica.high.watermark.checkpoint.interval.ms   = 5000

controlled.shutdown.enable                      = true
controlled.shutdown.max.retries                 = 1
controlled.shutdown.retry.backoff.ms            = 300000

auto.leader.rebalance.enable                    = true
leader.imbalance.per.broker.percentage          = 10
leader.imbalance.check.interval.seconds         = 300


zookeeper.connect                           = #{ZK_PATH}
zookeeper.session.timeout.ms                = 30000
zookeeper.connection.timeout.ms             = 30000


Why this might happen?


Thanks, 
Alex

Reply via email to