Hi Team,
I test Kafka 0.8.1 and have problems with missing brokers. What I did:
1. Installed Zookeeper 3.3.6 servers on 3 dedicated machines
2. Installed Kafka 0.8.1 brokers on 3 machines (different from Zookeeper
ones)
3. Configured brokers as above:
# Replication configurations
num.replica.fetchers=2
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
controller.socket.timeout.ms=30000
controller.message.queue.size=10
controlled.shutdown.enable=true
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=1200
# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=3000
4. Started Zookeeper and brokers. Created topics. In Zookeeper client I
saw:
ls /brokers/ids
[3, 2, 1]
5. Ran producers and consumers with very low traffic. All worked fine.
6. Increased traffic to ~100 messages per second and the controller
(ID=2 / machine 2) failed with broker on the same machine (I've seen few
times this scenario on my cluster).
My logs as following.
Machine 2:
Controller ID=2:
[2014-04-02 10:38:37,152] INFO
[Controller-2-to-broker-3-send-thread], Starting
(kafka.controller.RequestSendThread)
[2014-04-02 10:42:21,131] INFO [SessionExpirationListener on 2], ZK
expired; shut down all controller components and try to re-elect
(kafka.controller.KafkaController$SessionExpirationListener)
[2014-04-02 10:42:21,176] INFO [delete-topics-thread], Shutting down
(kafka.controller.TopicDeletionManager$DeleteTopicsThread)
Server ID=2:
[2014-04-02 10:38:39,113] INFO Partition [topic1,1] on broker 2:
Expanding ISR for partition [topic1,1] from 2,1 to 2,1,3
(kafka.cluster.Partition)
[2014-04-02 10:38:39,129] INFO Partition [topic2,1] on broker 2:
Expanding ISR for partition [topic2,1] from 2,1 to 2,1,3
(kafka.cluster.Partition)
[2014-04-02 10:38:39,488] INFO Closing socket connection to
/11.111.1.19. (kafka.network.Processor)
[2014-04-02 10:38:45,343] INFO Closing socket connection to
/11.111.1.19. (kafka.network.Processor)
[2014-04-02 10:39:21,842] INFO Closing socket connection to
/11.111.1.110. (kafka.network.Processor)
[2014-04-02 10:41:24,605] INFO Partition [topic3,1] on broker 2:
Shrinking ISR for partition [topic3,1] from 2,1,3 to 2,1
(kafka.cluster.Partition)
[2014-04-02 10:41:26,748] INFO Partition [topic3,1] on broker 2:
Expanding ISR for partition [topic3,1] from 2,1 to 2,1,3
(kafka.cluster.Partition)
[2014-04-02 10:42:22,727] INFO Closing socket connection to
/11.111.1.21. (kafka.network.Processor)
[2014-04-02 10:42:23,181] INFO Closing socket connection to
/11.111.1.21. (kafka.network.Processor)
[2014-04-02 10:42:24,351] INFO Closing socket connection to
/11.111.1.25. (kafka.network.Processor)
[2014-04-02 10:42:24,501] INFO Closing socket connection to
/11.111.1.25. (kafka.network.Processor)
[2014-04-02 10:42:27,915] ERROR [ReplicaFetcherThread-0-1], Error
for partition [topic2,4] to broker 1:class
kafka.common.NotLeaderForPartitionException
(kafka.server.ReplicaFetcherThread)
...
[2014-04-02 10:42:34,575] ERROR Conditional update of path
/brokers/topics/topic1/partitions/1/state with data
{"controller_epoch":7,"leader":2,"version":1,"leader_epoch":61,"isr":[2]}
and expected version 119 failed due to
org.apache.zookeeper.KeeperException$BadVersionException:
KeeperErrorCode = BadVersion for
/brokers/topics/topic1/partitions/1/state (kafka.utils.ZkUtils$)
[2014-04-02 10:42:34,575] INFO Partition [topic1,1] on broker 2:
Cached zkVersion [119] not equal to that in zookeeper, skip updating
ISR (kafka.cluster.Partition)
State-change ID=2:
[2014-04-02 10:38:37,134] TRACE Controller 2 epoch 7 received
response correlationId 313 for a request sent to broker
id:2,host:11.111.1.23,port:9092 (state.change.logger)
[2014-04-02 10:38:37,696] TRACE Controller 2 epoch 7 received
response correlationId 313 for a request sent to broker
id:1,host:11.111.1.21,port:9092 (state.change.logger)
[2014-04-02 10:38:38,682] TRACE Controller 2 epoch 7 received
response correlationId 312 for a request sent to broker
id:3,host:11.111.1.25,port:9092 (state.change.logger)
[2014-04-02 10:38:38,909] TRACE Controller 2 epoch 7 received
response correlationId 313 for a request sent to broker
id:3,host:11.111.1.25,port:9092 (state.change.logger)
[2014-04-02 10:38:38,918] TRACE Controller 2 epoch 7 received
response correlationId 313 for a request sent to broker
id:3,host:11.111.1.25,port:9092 (state.change.logger)
Machine 1:
Controller ID=1:
[2014-04-02 10:42:20,342] INFO [Controller 1]: Broker 1 starting
become controller state transition (kafka.controller.KafkaController)
[2014-04-02 10:42:20,372] INFO [Controller 1]: Controller 1
incremented epoch to 8 (kafka.controller.KafkaController)
[2014-04-02 10:42:21,176] DEBUG [Channel manager on controller 1]:
Controller 1 trying to connect to broker 1
(kafka.controller.ControllerChannelManager)
[2014-04-02 10:42:21,238] INFO
[Controller-1-to-broker-1-send-thread], Controller 1 connected to
id:1,host:11.111.1.21,port:9092 for sending state change requests
(kafka.controller.RequestSendThread)
[2014-04-02 10:42:21,239] DEBUG [Channel manager on controller 1]:
Controller 1 trying to connect to broker 3
(kafka.controller.ControllerChannelManager)
[2014-04-02 10:42:21,460] INFO
[Controller-1-to-broker-3-send-thread], Controller 1 connected to
id:3,host:11.111.1.25,port:9092 for sending state change requests
(kafka.controller.RequestSendThread)
[2014-04-02 10:42:21,483] INFO
[Controller-1-to-broker-3-send-thread], Starting
(kafka.controller.RequestSendThread)
[2014-04-02 10:42:21,483] INFO
[Controller-1-to-broker-1-send-thread], Starting
(kafka.controller.RequestSendThread)
[2014-04-02 10:42:21,506] INFO [Controller 1]: Partitions undergoing
preferred replica election: (kafka.controller.KafkaController)
[2014-04-02 10:42:21,515] INFO [Controller 1]: Partitions that
completed preferred replica election:
(kafka.controller.KafkaController)
[2014-04-02 10:42:21,516] INFO [Controller 1]: Resuming preferred
replica election for partitions: (kafka.controller.KafkaController)
[2014-04-02 10:42:21,653] INFO [Controller 1]: Partitions being
reassigned: Map() (kafka.controller.KafkaController)
[2014-04-02 10:42:21,655] INFO [Controller 1]: Partitions already
reassigned: List() (kafka.controller.KafkaController)
[2014-04-02 10:42:21,656] INFO [Controller 1]: Resuming reassignment
of partitions: Map() (kafka.controller.KafkaController)
[2014-04-02 10:42:21,742] INFO [Controller 1]: List of topics to be
deleted: (kafka.controller.KafkaController)
[2014-04-02 10:42:21,743] INFO [Controller 1]: List of topics
ineligible for deletion: topic1,topic2,topic3
(kafka.controller.KafkaController)
[2014-04-02 10:42:21,848] INFO [Controller 1]: Currently active
brokers in the cluster: Set(1, 3) (kafka.controller.KafkaController)
[2014-04-02 10:42:21,849] INFO [Controller 1]: Currently shutting
brokers in the cluster: Set() (kafka.controller.KafkaController)
After failover, Controller is on machine 1 and broker ID=2 disappeared
in Zookeeper:
ls /brokers/ids
[3, 1]
Only restart broker ID=2 helps in order to back. My questionis:
Why broker is not able to back and works?
(I see problem with the bad version in Zookeeper but I would like to
better understand it and prevent in future)
Thanks,
Chris