[ 
https://issues.apache.org/jira/browse/KAFKA-10363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kornev updated KAFKA-10363:
----------------------------------
    Description: 
We've just successfully set up a Kafka cluster consists of 3 brokers and faced 
with the following issue: when we change order of zookeeper servers in 
zookeeper.connect property in server.properties files and restart Kafka broker 
then this Kafka broker tries to connect to a new Kafka cluster. As a result, 
Kafka broker throws an error and shutdown. 

For example, config server.properties on first broker:
{code:java}
broker.id=-1
...
zookeeper.connect=node_1:2181/kafka,node_2:2181/kafka,node_3:2181/kafka
{code}
 We changed it to 
{code:java}
broker.id=-1
...
zookeeper.connect=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka {code}
and restart Kafka broker. 

Logs:
{code:java}
[2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:55,658] INFO [ExpirationReaper-0-topic]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:57,070] INFO Registered kafka:type=kafka.Log4jController MBean 
(kafka.utils.Log4jControllerRegistration$)[2020-08-05 09:07:57,656] INFO 
Registered signal handlers for TERM, INT, HUP 
(org.apache.kafka.common.utils.LoggingSignalHandler)[2020-08-05 09:07:57,657] 
INFO starting (kafka.server.KafkaServer)[2020-08-05 09:07:57,658] INFO 
Connecting to zookeeper on 
node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
(kafka.server.KafkaServer)[2020-08-05 09:07:57,685] INFO [ZooKeeperClient Kafka 
server] Initializing a new session to node_2:2181. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,690] INFO Client 
environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, 
built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,693] INFO Client environment:host.name=localhost 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client 
environment:java.version=11.0.8 (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,696] INFO Client environment:java.vendor=Ubuntu 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
environment:java.class.path=/opt/kafka/current/bin/../libs/activation-1.1.1.jar:/opt/kafka/current/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/current/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/current/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/current/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/current/bin/../libs/connect-api-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-basic-auth-extension-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-file-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-json-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-runtime-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-transforms-2.3.1.jar:/opt/kafka/current/bin/../libs/guava-20.0.jar:/opt/kafka/current/bin/../libs/hk2-api-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-locator-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-utils-2.5.0.jar:/opt/kafka/current/bin/../libs/jackson-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-core-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-databind-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-scala_2.12-2.10.0.jar:/opt/kafka/current/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/kafka/current/bin/../libs/jakarta.annotation-api-1.3.4.jar:/opt/kafka/current/bin/../libs/jakarta.inject-2.5.0.jar:/opt/kafka/current/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/opt/kafka/current/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/kafka/current/bin/../libs/javassist-3.22.0-CR2.jar:/opt/kafka/current/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/current/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/current/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/current/bin/../libs/jersey-client-2.28.jar:/opt/kafka/current/bin/../libs/jersey-common-2.28.jar:/opt/kafka/current/bin/../libs/jersey-container-servlet-2.28.jar:/opt/kafka/current/bin/../libs/jersey-container-servlet-core-2.28.jar:/opt/kafka/current/bin/../libs/jersey-hk2-2.28.jar:/opt/kafka/current/bin/../libs/jersey-media-jaxb-2.28.jar:/opt/kafka/current/bin/../libs/jersey-server-2.28.jar:/opt/kafka/current/bin/../libs/jetty-client-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-continuation-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-http-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-io-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-security-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-server-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-servlet-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-servlets-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-util-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka/current/bin/../libs/jsr305-3.0.2.jar:/opt/kafka/current/bin/../libs/kafka-clients-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-log4j-appender-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-examples-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-scala_2.12-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-test-utils-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-tools-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka_2.12-2.3.1-sources.jar:/opt/kafka/current/bin/../libs/kafka_2.12-2.3.1.jar:/opt/kafka/current/bin/../libs/log4j-1.2.17.jar:/opt/kafka/current/bin/../libs/lz4-java-1.6.0.jar:/opt/kafka/current/bin/../libs/maven-artifact-3.6.1.jar:/opt/kafka/current/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/current/bin/../libs/osgi-resource-locator-1.0.1.jar:/opt/kafka/current/bin/../libs/paranamer-2.8.jar:/opt/kafka/current/bin/../libs/plexus-utils-3.2.0.jar:/opt/kafka/current/bin/../libs/reflections-0.9.11.jar:/opt/kafka/current/bin/../libs/rocksdbjni-5.18.3.jar:/opt/kafka/current/bin/../libs/scala-library-2.12.10.jar:/opt/kafka/current/bin/../libs/scala-library-2.12.8.jar:/opt/kafka/current/bin/../libs/scala-logging_2.12-3.9.0.jar:/opt/kafka/current/bin/../libs/scala-reflect-2.12.8.jar:/opt/kafka/current/bin/../libs/slf4j-api-1.7.26.jar:/opt/kafka/current/bin/../libs/slf4j-log4j12-1.7.26.jar:/opt/kafka/current/bin/../libs/snappy-java-1.1.7.3.jar:/opt/kafka/current/bin/../libs/spotbugs-annotations-3.1.9.jar:/opt/kafka/current/bin/../libs/validation-api-2.0.1.Final.jar:/opt/kafka/current/bin/../libs/zkclient-0.11.jar:/opt/kafka/current/bin/../libs/zookeeper-3.4.14.jar:/opt/kafka/current/bin/../libs/zstd-jni-1.4.0-1.jar
 (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,697] INFO Client 
environment:java.library.path=/usr/java/packages/lib:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,697] INFO Client 
environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,697] INFO Client environment:java.compiler=<NA> 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,697] INFO Client 
environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,697] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,698] INFO Client 
environment:os.version=4.15.0-66-generic 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,698] INFO Client 
environment:user.name=kafka (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,698] INFO Client environment:user.home=/opt/kafka 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,698] INFO Client 
environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,699] INFO Initiating client connection, connectString=node_2:2181 
sessionTimeout=6000 
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@74bada02 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,718] INFO [ZooKeeperClient 
Kafka server] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,818] INFO Client 
successfully logged in. (org.apache.zookeeper.Login)[2020-08-05 09:07:57,821] 
INFO Client will use DIGEST-MD5 as SASL mechanism. 
(org.apache.zookeeper.client.ZooKeeperSaslClient)[2020-08-05 09:07:57,826] INFO 
Opening socket connection to server node_2/node_2:2181. Will attempt to 
SASL-authenticate using Login Context section 'Client' 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,832] INFO Socket 
connection established to node_2/node_2:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,841] INFO Session 
establishment complete on server node_2/node_2:2181, sessionid = 
0x373bdbbd3b00002, negotiated timeout = 6000 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,847] INFO 
[ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,925] INFO Created 
zookeeper path /kafka,node_3:2181/kafka,node_1:2181/kafka 
(kafka.server.KafkaServer)[2020-08-05 09:07:57,926] INFO [ZooKeeperClient Kafka 
server] Closing. (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,933] 
INFO Session: 0x373bdbbd3b00002 closed 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,934] INFO EventThread shut 
down for session: 0x373bdbbd3b00002 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,937] INFO 
[ZooKeeperClient Kafka server] Closed. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,939] INFO 
[ZooKeeperClient Kafka server] Initializing a new session to 
node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,939] INFO Initiating 
client connection, 
connectString=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
sessionTimeout=6000 
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@6ff65192 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,940] INFO [ZooKeeperClient 
Kafka server] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,941] INFO Client will use 
DIGEST-MD5 as SASL mechanism. 
(org.apache.zookeeper.client.ZooKeeperSaslClient)[2020-08-05 09:07:57,943] INFO 
Opening socket connection to server node_2/node_2:2181. Will attempt to 
SASL-authenticate using Login Context section 'Client' 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,944] INFO Socket 
connection established to node_2/node_2:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,949] INFO Session 
establishment complete on server node_2/node_2:2181, sessionid = 
0x373bdbbd3b00003, negotiated timeout = 6000 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,950] INFO 
[ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:58,167] INFO Cluster ID = 
5_d5S6HeQBWf0ZzwQ6TjRA (kafka.server.KafkaServer)[2020-08-05 09:07:58,253] INFO 
KafkaConfig values:  advertised.host.name = node_3 advertised.listeners = 
SSL://node_3:9093 advertised.port = null alter.config.policy.class.name = null 
alter.log.dirs.replication.quota.window.num = 11 
alter.log.dirs.replication.quota.window.size.seconds = 1 authorizer.class.name 
= kafka.security.auth.SimpleAclAuthorizer auto.create.topics.enable = true 
auto.leader.rebalance.enable = true background.threads = 10 broker.id = -1 
broker.id.generation.enable = true broker.rack = null 
client.quota.callback.class = null compression.type = producer 
connection.failed.authentication.delay.ms = 100 connections.max.idle.ms = 
600000 connections.max.reauth.ms = 0 control.plane.listener.name = null 
controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 
controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 
30000 create.topic.policy.class.name = null default.replication.factor = 2 
delegation.token.expiry.check.interval.ms = 3600000 
delegation.token.expiry.time.ms = 86400000 delegation.token.master.key = null 
delegation.token.max.lifetime.ms = 604800000 
delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true 
fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms 
= 3000 group.max.session.timeout.ms = 60000 group.max.size = 2147483647 
group.min.session.timeout.ms = 10000 host.name = node_3 
inter.broker.listener.name = null inter.broker.protocol.version = 2.3-IV1 
kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] 
leader.imbalance.check.interval.seconds = 300 
leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = 
SSL:SSL listeners = SSL://node_3:9093 log.cleaner.backoff.ms = 15000 
log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 
86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 
log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 
1.7976931348623157E308 log.cleaner.max.compaction.lag.ms = 9223372036854775807 
log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 
log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs 
log.dirs = /var/lib/kafka log.flush.interval.messages = 5000 
log.flush.interval.ms = 5000 log.flush.offset.checkpoint.interval.ms = 60000 
log.flush.scheduler.interval.ms = 9223372036854775807 
log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes 
= 4096 log.index.size.max.bytes = 10485760 log.message.downconversion.enable = 
true log.message.format.version = 2.3-IV1 
log.message.timestamp.difference.max.ms = 9223372036854775807 
log.message.timestamp.type = CreateTime log.preallocate = false 
log.retention.bytes = -1 log.retention.check.interval.ms = 300000 
log.retention.hours = 336 log.retention.minutes = null log.retention.ms = null 
log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null 
log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 
60000 max.connections = 2147483647 max.connections.per.ip = 2147483647 
max.connections.per.ip.overrides =  max.incremental.fetch.session.cache.slots = 
1000 message.max.bytes = 2000024 metric.reporters = [] metrics.num.samples = 2 
metrics.recording.level = INFO metrics.sample.window.ms = 30000 
min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 
num.partitions = 4 num.recovery.threads.per.data.dir = 4 
num.replica.alter.log.dirs.threads = null num.replica.fetchers = 1 
offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 
offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 
offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 10080 
offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 
offsets.topic.replication.factor = 2 offsets.topic.segment.bytes = 104857600 
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding 
password.encoder.iterations = 4096 password.encoder.key.length = 128 
password.encoder.keyfactory.algorithm = null password.encoder.old.secret = null 
password.encoder.secret = null port = 9092 principal.builder.class = null 
producer.purgatory.purge.interval.requests = 1000 queued.max.request.bytes = -1 
queued.max.requests = 500 quota.consumer.default = 9223372036854775807 
quota.producer.default = 9223372036854775807 quota.window.num = 11 
quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 
replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 
replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 
replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 
30000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 
30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 
1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 
sasl.client.callback.handler.class = null sasl.enabled.mechanisms = [GSSAPI] 
sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = 
null sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.login.callback.handler.class = null sasl.login.class = null 
sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 
60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 
0.05 sasl.mechanism.inter.broker.protocol = GSSAPI 
sasl.server.callback.handler.class = null security.inter.broker.protocol = SSL 
socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 
socket.send.buffer.bytes = 102400 ssl.cipher.suites = [] ssl.client.auth = 
required ssl.enabled.protocols = [TLSv1.2] 
ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden] 
ssl.keymanager.algorithm = SunX509 ssl.keystore.location = 
/etc/ssl/kafka/kafka.keystore.jks ssl.keystore.password = [hidden] 
ssl.keystore.type = JKS ssl.principal.mapping.rules = [DEFAULT] ssl.protocol = 
TLS ssl.provider = null ssl.secure.random.implementation = SHA1PRNG 
ssl.trustmanager.algorithm = PKIX ssl.truststore.location = 
/etc/ssl/kafka/kafka.truststore.jks ssl.truststore.password = [hidden] 
ssl.truststore.type = JKS 
transaction.abort.timed.out.transaction.cleanup.interval.ms = 180000 
transaction.max.timeout.ms = 900000 
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 
transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr 
= 2 transaction.state.log.num.partitions = 50 
transaction.state.log.replication.factor = 2 
transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms 
= 604800000 unclean.leader.election.enable = false zookeeper.connect = 
node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
zookeeper.connection.timeout.ms = 15000 zookeeper.max.in.flight.requests = 10 
zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = true 
zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig)[2020-08-05 
09:07:58,272] INFO KafkaConfig values:  advertised.host.name = node_3 
advertised.listeners = SSL://node_3:9093 advertised.port = null 
alter.config.policy.class.name = null 
alter.log.dirs.replication.quota.window.num = 11 
alter.log.dirs.replication.quota.window.size.seconds = 1 authorizer.class.name 
= kafka.security.auth.SimpleAclAuthorizer auto.create.topics.enable = true 
auto.leader.rebalance.enable = true background.threads = 10 broker.id = -1 
broker.id.generation.enable = true broker.rack = null 
client.quota.callback.class = null compression.type = producer 
connection.failed.authentication.delay.ms = 100 connections.max.idle.ms = 
600000 connections.max.reauth.ms = 0 control.plane.listener.name = null 
controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 
controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 
30000 create.topic.policy.class.name = null default.replication.factor = 2 
delegation.token.expiry.check.interval.ms = 3600000 
delegation.token.expiry.time.ms = 86400000 delegation.token.master.key = null 
delegation.token.max.lifetime.ms = 604800000 
delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true 
fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms 
= 3000 group.max.session.timeout.ms = 60000 group.max.size = 2147483647 
group.min.session.timeout.ms = 10000 host.name = node_3 
inter.broker.listener.name = null inter.broker.protocol.version = 2.3-IV1 
kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] 
leader.imbalance.check.interval.seconds = 300 
leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = 
SSL:SSL listeners = SSL://node_3:9093 log.cleaner.backoff.ms = 15000 
log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 
86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 
log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 
1.7976931348623157E308 log.cleaner.max.compaction.lag.ms = 9223372036854775807 
log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 
log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs 
log.dirs = /var/lib/kafka log.flush.interval.messages = 5000 
log.flush.interval.ms = 5000 log.flush.offset.checkpoint.interval.ms = 60000 
log.flush.scheduler.interval.ms = 9223372036854775807 
log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes 
= 4096 log.index.size.max.bytes = 10485760 log.message.downconversion.enable = 
true log.message.format.version = 2.3-IV1 
log.message.timestamp.difference.max.ms = 9223372036854775807 
log.message.timestamp.type = CreateTime log.preallocate = false 
log.retention.bytes = -1 log.retention.check.interval.ms = 300000 
log.retention.hours = 336 log.retention.minutes = null log.retention.ms = null 
log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null 
log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 
60000 max.connections = 2147483647 max.connections.per.ip = 2147483647 
max.connections.per.ip.overrides =  max.incremental.fetch.session.cache.slots = 
1000 message.max.bytes = 2000024 metric.reporters = [] metrics.num.samples = 2 
metrics.recording.level = INFO metrics.sample.window.ms = 30000 
min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 
num.partitions = 4 num.recovery.threads.per.data.dir = 4 
num.replica.alter.log.dirs.threads = null num.replica.fetchers = 1 
offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 
offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 
offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 10080 
offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 
offsets.topic.replication.factor = 2 offsets.topic.segment.bytes = 104857600 
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding 
password.encoder.iterations = 4096 password.encoder.key.length = 128 
password.encoder.keyfactory.algorithm = null password.encoder.old.secret = null 
password.encoder.secret = null port = 9092 principal.builder.class = null 
producer.purgatory.purge.interval.requests = 1000 queued.max.request.bytes = -1 
queued.max.requests = 500 quota.consumer.default = 9223372036854775807 
quota.producer.default = 9223372036854775807 quota.window.num = 11 
quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 
replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 
replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 
replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 
30000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 
30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 
1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 
sasl.client.callback.handler.class = null sasl.enabled.mechanisms = [GSSAPI] 
sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = 
null sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.login.callback.handler.class = null sasl.login.class = null 
sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 
60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 
0.05 sasl.mechanism.inter.broker.protocol = GSSAPI 
sasl.server.callback.handler.class = null security.inter.broker.protocol = SSL 
socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 
socket.send.buffer.bytes = 102400 ssl.cipher.suites = [] ssl.client.auth = 
required ssl.enabled.protocols = [TLSv1.2] 
ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden] 
ssl.keymanager.algorithm = SunX509 ssl.keystore.location = 
/etc/ssl/kafka/kafka.keystore.jks ssl.keystore.password = [hidden] 
ssl.keystore.type = JKS ssl.principal.mapping.rules = [DEFAULT] ssl.protocol = 
TLS ssl.provider = null ssl.secure.random.implementation = SHA1PRNG 
ssl.trustmanager.algorithm = PKIX ssl.truststore.location = 
/etc/ssl/kafka/kafka.truststore.jks ssl.truststore.password = [hidden] 
ssl.truststore.type = JKS 
transaction.abort.timed.out.transaction.cleanup.interval.ms = 180000 
transaction.max.timeout.ms = 900000 
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 
transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr 
= 2 transaction.state.log.num.partitions = 50 
transaction.state.log.replication.factor = 2 
transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms 
= 604800000 unclean.leader.election.enable = false zookeeper.connect = 
node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
zookeeper.connection.timeout.ms = 15000 zookeeper.max.in.flight.requests = 10 
zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = true 
zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig)[2020-08-05 
09:07:58,328] INFO [ThrottledChannelReaper-Produce]: Starting 
(kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2020-08-05 
09:07:58,328] INFO [ThrottledChannelReaper-Fetch]: Starting 
(kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2020-08-05 
09:07:58,331] INFO [ThrottledChannelReaper-Request]: Starting 
(kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2020-08-05 
09:07:58,361] INFO Loading logs. (kafka.log.LogManager)[2020-08-05 
09:07:58,374] INFO Logs loading complete in 13 ms. 
(kafka.log.LogManager)[2020-08-05 09:07:58,403] INFO Starting log cleanup with 
a period of 300000 ms. (kafka.log.LogManager)[2020-08-05 09:07:58,407] INFO 
Starting log flusher with a default period of 9223372036854775807 ms. 
(kafka.log.LogManager)[2020-08-05 09:07:58,805] INFO Awaiting socket 
connections on node_3:9093. (kafka.network.Acceptor)[2020-08-05 09:08:00,594] 
INFO [SocketServer brokerId=0] Created data-plane acceptor and processors for 
endpoint : EndPoint(node_3,9093,ListenerName(SSL),SSL) 
(kafka.network.SocketServer)[2020-08-05 09:08:00,596] INFO [SocketServer 
brokerId=0] Started 1 acceptor threads for data-plane 
(kafka.network.SocketServer)[2020-08-05 09:08:00,629] INFO 
[ExpirationReaper-0-Produce]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:08:00,630] INFO [ExpirationReaper-0-Fetch]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:08:00,630] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:08:00,631] INFO [ExpirationReaper-0-ElectPreferredLeader]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:08:00,660] INFO [LogDirFailureHandler]: Starting 
(kafka.server.ReplicaManager$LogDirFailureHandler)[2020-08-05 09:08:00,726] 
INFO Creating /brokers/ids/0 (is it secure? true) 
(kafka.zk.KafkaZkClient)[2020-08-05 09:08:00,751] ERROR Error while creating 
ephemeral at /brokers/ids/0, node already exists and owner '248751018843570177' 
does not match current session '248751018843570179' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)[2020-08-05 09:08:00,757] ERROR 
[KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown 
(kafka.server.KafkaServer)org.apache.zookeeper.KeeperException$NodeExistsException:
 KeeperErrorCode = NodeExists at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:122) at 
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1784)
 at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1722) at 
kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1689) at 
kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:97) at 
kafka.server.KafkaServer.startup(KafkaServer.scala:262) at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) at 
kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)[2020-08-05 
09:08:00,763] INFO [KafkaServer id=0] shutting down (kafka.server.KafkaServer)
{code}
 

As you can see here, this broker tries to connect to Cluster with ID 
_5_d5S6HeQBWf0ZzwQ6TjRA_, but other brokers are connected to Cluster with ID 
_OhWuEGMeQHe66HP74rurRA_.

If we bring back the order of zookeeper servers in the config file then Kafka 
broker starts normally and connects to an existing cluster.

This issue blocks us from adding new nodes to the cluster and removes the old 
ones.

Cluster details:
 * 3 Kafka nodes cluster running 2.3.1 (also reproduced on 2.4.0)
 * 3 Zookeeper node cluster running 3.4.10

 

  was:
We've just successfully set up a Kafka cluster consists of 3 brokers and faced 
with the following issue: when we change order of zookeeper servers in 
zookeeper.connect property in server.properties files and restart Kafka broker 
then this Kafka broker tries to connect to a new Kafka cluster. As a result, 
Kafka broker throws an error and shutdown. 

For example, config server.properties on first broker:
{code:java}
broker.id=-1
...
zookeeper.connect=node_1:2181/kafka,node_2:2181/kafka,node_3:2181/kafka
{code}
 

 

 We changed it to 

 
{code:java}
broker.id=-1
...
zookeeper.connect=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka
{code}
 

and restart Kafka broker. 

Logs:

{code:java}
[2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:55,658] INFO [ExpirationReaper-0-topic]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:57,070] INFO Registered kafka:type=kafka.Log4jController MBean 
(kafka.utils.Log4jControllerRegistration$)[2020-08-05 09:07:57,656] INFO 
Registered signal handlers for TERM, INT, HUP 
(org.apache.kafka.common.utils.LoggingSignalHandler)[2020-08-05 09:07:57,657] 
INFO starting (kafka.server.KafkaServer)[2020-08-05 09:07:57,658] INFO 
Connecting to zookeeper on 
node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
(kafka.server.KafkaServer)[2020-08-05 09:07:57,685] INFO [ZooKeeperClient Kafka 
server] Initializing a new session to node_2:2181. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,690] INFO Client 
environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, 
built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,693] INFO Client environment:host.name=localhost 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client 
environment:java.version=11.0.8 (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,696] INFO Client environment:java.vendor=Ubuntu 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
environment:java.class.path=/opt/kafka/current/bin/../libs/activation-1.1.1.jar:/opt/kafka/current/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/current/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/current/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/current/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/current/bin/../libs/connect-api-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-basic-auth-extension-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-file-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-json-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-runtime-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-transforms-2.3.1.jar:/opt/kafka/current/bin/../libs/guava-20.0.jar:/opt/kafka/current/bin/../libs/hk2-api-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-locator-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-utils-2.5.0.jar:/opt/kafka/current/bin/../libs/jackson-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-core-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-databind-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-scala_2.12-2.10.0.jar:/opt/kafka/current/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/kafka/current/bin/../libs/jakarta.annotation-api-1.3.4.jar:/opt/kafka/current/bin/../libs/jakarta.inject-2.5.0.jar:/opt/kafka/current/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/opt/kafka/current/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/kafka/current/bin/../libs/javassist-3.22.0-CR2.jar:/opt/kafka/current/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/current/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/current/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/current/bin/../libs/jersey-client-2.28.jar:/opt/kafka/current/bin/../libs/jersey-common-2.28.jar:/opt/kafka/current/bin/../libs/jersey-container-servlet-2.28.jar:/opt/kafka/current/bin/../libs/jersey-container-servlet-core-2.28.jar:/opt/kafka/current/bin/../libs/jersey-hk2-2.28.jar:/opt/kafka/current/bin/../libs/jersey-media-jaxb-2.28.jar:/opt/kafka/current/bin/../libs/jersey-server-2.28.jar:/opt/kafka/current/bin/../libs/jetty-client-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-continuation-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-http-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-io-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-security-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-server-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-servlet-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-servlets-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-util-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka/current/bin/../libs/jsr305-3.0.2.jar:/opt/kafka/current/bin/../libs/kafka-clients-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-log4j-appender-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-examples-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-scala_2.12-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-test-utils-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-tools-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka_2.12-2.3.1-sources.jar:/opt/kafka/current/bin/../libs/kafka_2.12-2.3.1.jar:/opt/kafka/current/bin/../libs/log4j-1.2.17.jar:/opt/kafka/current/bin/../libs/lz4-java-1.6.0.jar:/opt/kafka/current/bin/../libs/maven-artifact-3.6.1.jar:/opt/kafka/current/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/current/bin/../libs/osgi-resource-locator-1.0.1.jar:/opt/kafka/current/bin/../libs/paranamer-2.8.jar:/opt/kafka/current/bin/../libs/plexus-utils-3.2.0.jar:/opt/kafka/current/bin/../libs/reflections-0.9.11.jar:/opt/kafka/current/bin/../libs/rocksdbjni-5.18.3.jar:/opt/kafka/current/bin/../libs/scala-library-2.12.10.jar:/opt/kafka/current/bin/../libs/scala-library-2.12.8.jar:/opt/kafka/current/bin/../libs/scala-logging_2.12-3.9.0.jar:/opt/kafka/current/bin/../libs/scala-reflect-2.12.8.jar:/opt/kafka/current/bin/../libs/slf4j-api-1.7.26.jar:/opt/kafka/current/bin/../libs/slf4j-log4j12-1.7.26.jar:/opt/kafka/current/bin/../libs/snappy-java-1.1.7.3.jar:/opt/kafka/current/bin/../libs/spotbugs-annotations-3.1.9.jar:/opt/kafka/current/bin/../libs/validation-api-2.0.1.Final.jar:/opt/kafka/current/bin/../libs/zkclient-0.11.jar:/opt/kafka/current/bin/../libs/zookeeper-3.4.14.jar:/opt/kafka/current/bin/../libs/zstd-jni-1.4.0-1.jar
 (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,697] INFO Client 
environment:java.library.path=/usr/java/packages/lib:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,697] INFO Client 
environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,697] INFO Client environment:java.compiler=<NA> 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,697] INFO Client 
environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,697] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,698] INFO Client 
environment:os.version=4.15.0-66-generic 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,698] INFO Client 
environment:user.name=kafka (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,698] INFO Client environment:user.home=/opt/kafka 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,698] INFO Client 
environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,699] INFO Initiating client connection, connectString=node_2:2181 
sessionTimeout=6000 
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@74bada02 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,718] INFO [ZooKeeperClient 
Kafka server] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,818] INFO Client 
successfully logged in. (org.apache.zookeeper.Login)[2020-08-05 09:07:57,821] 
INFO Client will use DIGEST-MD5 as SASL mechanism. 
(org.apache.zookeeper.client.ZooKeeperSaslClient)[2020-08-05 09:07:57,826] INFO 
Opening socket connection to server node_2/node_2:2181. Will attempt to 
SASL-authenticate using Login Context section 'Client' 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,832] INFO Socket 
connection established to node_2/node_2:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,841] INFO Session 
establishment complete on server node_2/node_2:2181, sessionid = 
0x373bdbbd3b00002, negotiated timeout = 6000 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,847] INFO 
[ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,925] INFO Created 
zookeeper path /kafka,node_3:2181/kafka,node_1:2181/kafka 
(kafka.server.KafkaServer)[2020-08-05 09:07:57,926] INFO [ZooKeeperClient Kafka 
server] Closing. (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,933] 
INFO Session: 0x373bdbbd3b00002 closed 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,934] INFO EventThread shut 
down for session: 0x373bdbbd3b00002 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,937] INFO 
[ZooKeeperClient Kafka server] Closed. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,939] INFO 
[ZooKeeperClient Kafka server] Initializing a new session to 
node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,939] INFO Initiating 
client connection, 
connectString=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
sessionTimeout=6000 
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@6ff65192 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,940] INFO [ZooKeeperClient 
Kafka server] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,941] INFO Client will use 
DIGEST-MD5 as SASL mechanism. 
(org.apache.zookeeper.client.ZooKeeperSaslClient)[2020-08-05 09:07:57,943] INFO 
Opening socket connection to server node_2/node_2:2181. Will attempt to 
SASL-authenticate using Login Context section 'Client' 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,944] INFO Socket 
connection established to node_2/node_2:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,949] INFO Session 
establishment complete on server node_2/node_2:2181, sessionid = 
0x373bdbbd3b00003, negotiated timeout = 6000 
(org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,950] INFO 
[ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:58,167] INFO Cluster ID = 
5_d5S6HeQBWf0ZzwQ6TjRA (kafka.server.KafkaServer)[2020-08-05 09:07:58,253] INFO 
KafkaConfig values:  advertised.host.name = node_3 advertised.listeners = 
SSL://node_3:9093 advertised.port = null alter.config.policy.class.name = null 
alter.log.dirs.replication.quota.window.num = 11 
alter.log.dirs.replication.quota.window.size.seconds = 1 authorizer.class.name 
= kafka.security.auth.SimpleAclAuthorizer auto.create.topics.enable = true 
auto.leader.rebalance.enable = true background.threads = 10 broker.id = -1 
broker.id.generation.enable = true broker.rack = null 
client.quota.callback.class = null compression.type = producer 
connection.failed.authentication.delay.ms = 100 connections.max.idle.ms = 
600000 connections.max.reauth.ms = 0 control.plane.listener.name = null 
controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 
controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 
30000 create.topic.policy.class.name = null default.replication.factor = 2 
delegation.token.expiry.check.interval.ms = 3600000 
delegation.token.expiry.time.ms = 86400000 delegation.token.master.key = null 
delegation.token.max.lifetime.ms = 604800000 
delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true 
fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms 
= 3000 group.max.session.timeout.ms = 60000 group.max.size = 2147483647 
group.min.session.timeout.ms = 10000 host.name = node_3 
inter.broker.listener.name = null inter.broker.protocol.version = 2.3-IV1 
kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] 
leader.imbalance.check.interval.seconds = 300 
leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = 
SSL:SSL listeners = SSL://node_3:9093 log.cleaner.backoff.ms = 15000 
log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 
86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 
log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 
1.7976931348623157E308 log.cleaner.max.compaction.lag.ms = 9223372036854775807 
log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 
log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs 
log.dirs = /var/lib/kafka log.flush.interval.messages = 5000 
log.flush.interval.ms = 5000 log.flush.offset.checkpoint.interval.ms = 60000 
log.flush.scheduler.interval.ms = 9223372036854775807 
log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes 
= 4096 log.index.size.max.bytes = 10485760 log.message.downconversion.enable = 
true log.message.format.version = 2.3-IV1 
log.message.timestamp.difference.max.ms = 9223372036854775807 
log.message.timestamp.type = CreateTime log.preallocate = false 
log.retention.bytes = -1 log.retention.check.interval.ms = 300000 
log.retention.hours = 336 log.retention.minutes = null log.retention.ms = null 
log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null 
log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 
60000 max.connections = 2147483647 max.connections.per.ip = 2147483647 
max.connections.per.ip.overrides =  max.incremental.fetch.session.cache.slots = 
1000 message.max.bytes = 2000024 metric.reporters = [] metrics.num.samples = 2 
metrics.recording.level = INFO metrics.sample.window.ms = 30000 
min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 
num.partitions = 4 num.recovery.threads.per.data.dir = 4 
num.replica.alter.log.dirs.threads = null num.replica.fetchers = 1 
offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 
offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 
offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 10080 
offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 
offsets.topic.replication.factor = 2 offsets.topic.segment.bytes = 104857600 
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding 
password.encoder.iterations = 4096 password.encoder.key.length = 128 
password.encoder.keyfactory.algorithm = null password.encoder.old.secret = null 
password.encoder.secret = null port = 9092 principal.builder.class = null 
producer.purgatory.purge.interval.requests = 1000 queued.max.request.bytes = -1 
queued.max.requests = 500 quota.consumer.default = 9223372036854775807 
quota.producer.default = 9223372036854775807 quota.window.num = 11 
quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 
replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 
replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 
replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 
30000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 
30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 
1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 
sasl.client.callback.handler.class = null sasl.enabled.mechanisms = [GSSAPI] 
sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = 
null sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.login.callback.handler.class = null sasl.login.class = null 
sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 
60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 
0.05 sasl.mechanism.inter.broker.protocol = GSSAPI 
sasl.server.callback.handler.class = null security.inter.broker.protocol = SSL 
socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 
socket.send.buffer.bytes = 102400 ssl.cipher.suites = [] ssl.client.auth = 
required ssl.enabled.protocols = [TLSv1.2] 
ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden] 
ssl.keymanager.algorithm = SunX509 ssl.keystore.location = 
/etc/ssl/kafka/kafka.keystore.jks ssl.keystore.password = [hidden] 
ssl.keystore.type = JKS ssl.principal.mapping.rules = [DEFAULT] ssl.protocol = 
TLS ssl.provider = null ssl.secure.random.implementation = SHA1PRNG 
ssl.trustmanager.algorithm = PKIX ssl.truststore.location = 
/etc/ssl/kafka/kafka.truststore.jks ssl.truststore.password = [hidden] 
ssl.truststore.type = JKS 
transaction.abort.timed.out.transaction.cleanup.interval.ms = 180000 
transaction.max.timeout.ms = 900000 
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 
transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr 
= 2 transaction.state.log.num.partitions = 50 
transaction.state.log.replication.factor = 2 
transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms 
= 604800000 unclean.leader.election.enable = false zookeeper.connect = 
node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
zookeeper.connection.timeout.ms = 15000 zookeeper.max.in.flight.requests = 10 
zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = true 
zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig)[2020-08-05 
09:07:58,272] INFO KafkaConfig values:  advertised.host.name = node_3 
advertised.listeners = SSL://node_3:9093 advertised.port = null 
alter.config.policy.class.name = null 
alter.log.dirs.replication.quota.window.num = 11 
alter.log.dirs.replication.quota.window.size.seconds = 1 authorizer.class.name 
= kafka.security.auth.SimpleAclAuthorizer auto.create.topics.enable = true 
auto.leader.rebalance.enable = true background.threads = 10 broker.id = -1 
broker.id.generation.enable = true broker.rack = null 
client.quota.callback.class = null compression.type = producer 
connection.failed.authentication.delay.ms = 100 connections.max.idle.ms = 
600000 connections.max.reauth.ms = 0 control.plane.listener.name = null 
controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 
controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 
30000 create.topic.policy.class.name = null default.replication.factor = 2 
delegation.token.expiry.check.interval.ms = 3600000 
delegation.token.expiry.time.ms = 86400000 delegation.token.master.key = null 
delegation.token.max.lifetime.ms = 604800000 
delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true 
fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms 
= 3000 group.max.session.timeout.ms = 60000 group.max.size = 2147483647 
group.min.session.timeout.ms = 10000 host.name = node_3 
inter.broker.listener.name = null inter.broker.protocol.version = 2.3-IV1 
kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] 
leader.imbalance.check.interval.seconds = 300 
leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = 
SSL:SSL listeners = SSL://node_3:9093 log.cleaner.backoff.ms = 15000 
log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 
86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 
log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 
1.7976931348623157E308 log.cleaner.max.compaction.lag.ms = 9223372036854775807 
log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 
log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs 
log.dirs = /var/lib/kafka log.flush.interval.messages = 5000 
log.flush.interval.ms = 5000 log.flush.offset.checkpoint.interval.ms = 60000 
log.flush.scheduler.interval.ms = 9223372036854775807 
log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes 
= 4096 log.index.size.max.bytes = 10485760 log.message.downconversion.enable = 
true log.message.format.version = 2.3-IV1 
log.message.timestamp.difference.max.ms = 9223372036854775807 
log.message.timestamp.type = CreateTime log.preallocate = false 
log.retention.bytes = -1 log.retention.check.interval.ms = 300000 
log.retention.hours = 336 log.retention.minutes = null log.retention.ms = null 
log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null 
log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 
60000 max.connections = 2147483647 max.connections.per.ip = 2147483647 
max.connections.per.ip.overrides =  max.incremental.fetch.session.cache.slots = 
1000 message.max.bytes = 2000024 metric.reporters = [] metrics.num.samples = 2 
metrics.recording.level = INFO metrics.sample.window.ms = 30000 
min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 
num.partitions = 4 num.recovery.threads.per.data.dir = 4 
num.replica.alter.log.dirs.threads = null num.replica.fetchers = 1 
offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 
offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 
offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 10080 
offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 
offsets.topic.replication.factor = 2 offsets.topic.segment.bytes = 104857600 
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding 
password.encoder.iterations = 4096 password.encoder.key.length = 128 
password.encoder.keyfactory.algorithm = null password.encoder.old.secret = null 
password.encoder.secret = null port = 9092 principal.builder.class = null 
producer.purgatory.purge.interval.requests = 1000 queued.max.request.bytes = -1 
queued.max.requests = 500 quota.consumer.default = 9223372036854775807 
quota.producer.default = 9223372036854775807 quota.window.num = 11 
quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 
replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 
replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 
replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 
30000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 
30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 
1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 
sasl.client.callback.handler.class = null sasl.enabled.mechanisms = [GSSAPI] 
sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = 
null sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.login.callback.handler.class = null sasl.login.class = null 
sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 
60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 
0.05 sasl.mechanism.inter.broker.protocol = GSSAPI 
sasl.server.callback.handler.class = null security.inter.broker.protocol = SSL 
socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 
socket.send.buffer.bytes = 102400 ssl.cipher.suites = [] ssl.client.auth = 
required ssl.enabled.protocols = [TLSv1.2] 
ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden] 
ssl.keymanager.algorithm = SunX509 ssl.keystore.location = 
/etc/ssl/kafka/kafka.keystore.jks ssl.keystore.password = [hidden] 
ssl.keystore.type = JKS ssl.principal.mapping.rules = [DEFAULT] ssl.protocol = 
TLS ssl.provider = null ssl.secure.random.implementation = SHA1PRNG 
ssl.trustmanager.algorithm = PKIX ssl.truststore.location = 
/etc/ssl/kafka/kafka.truststore.jks ssl.truststore.password = [hidden] 
ssl.truststore.type = JKS 
transaction.abort.timed.out.transaction.cleanup.interval.ms = 180000 
transaction.max.timeout.ms = 900000 
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 
transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr 
= 2 transaction.state.log.num.partitions = 50 
transaction.state.log.replication.factor = 2 
transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms 
= 604800000 unclean.leader.election.enable = false zookeeper.connect = 
node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
zookeeper.connection.timeout.ms = 15000 zookeeper.max.in.flight.requests = 10 
zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = true 
zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig)[2020-08-05 
09:07:58,328] INFO [ThrottledChannelReaper-Produce]: Starting 
(kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2020-08-05 
09:07:58,328] INFO [ThrottledChannelReaper-Fetch]: Starting 
(kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2020-08-05 
09:07:58,331] INFO [ThrottledChannelReaper-Request]: Starting 
(kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2020-08-05 
09:07:58,361] INFO Loading logs. (kafka.log.LogManager)[2020-08-05 
09:07:58,374] INFO Logs loading complete in 13 ms. 
(kafka.log.LogManager)[2020-08-05 09:07:58,403] INFO Starting log cleanup with 
a period of 300000 ms. (kafka.log.LogManager)[2020-08-05 09:07:58,407] INFO 
Starting log flusher with a default period of 9223372036854775807 ms. 
(kafka.log.LogManager)[2020-08-05 09:07:58,805] INFO Awaiting socket 
connections on node_3:9093. (kafka.network.Acceptor)[2020-08-05 09:08:00,594] 
INFO [SocketServer brokerId=0] Created data-plane acceptor and processors for 
endpoint : EndPoint(node_3,9093,ListenerName(SSL),SSL) 
(kafka.network.SocketServer)[2020-08-05 09:08:00,596] INFO [SocketServer 
brokerId=0] Started 1 acceptor threads for data-plane 
(kafka.network.SocketServer)[2020-08-05 09:08:00,629] INFO 
[ExpirationReaper-0-Produce]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:08:00,630] INFO [ExpirationReaper-0-Fetch]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:08:00,630] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:08:00,631] INFO [ExpirationReaper-0-ElectPreferredLeader]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:08:00,660] INFO [LogDirFailureHandler]: Starting 
(kafka.server.ReplicaManager$LogDirFailureHandler)[2020-08-05 09:08:00,726] 
INFO Creating /brokers/ids/0 (is it secure? true) 
(kafka.zk.KafkaZkClient)[2020-08-05 09:08:00,751] ERROR Error while creating 
ephemeral at /brokers/ids/0, node already exists and owner '248751018843570177' 
does not match current session '248751018843570179' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)[2020-08-05 09:08:00,757] ERROR 
[KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown 
(kafka.server.KafkaServer)org.apache.zookeeper.KeeperException$NodeExistsException:
 KeeperErrorCode = NodeExists at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:122) at 
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1784)
 at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1722) at 
kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1689) at 
kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:97) at 
kafka.server.KafkaServer.startup(KafkaServer.scala:262) at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) at 
kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)[2020-08-05 
09:08:00,763] INFO [KafkaServer id=0] shutting down (kafka.server.KafkaServer)
{code}
 

As you can see here, this broker tries to connect to Cluster with ID 
_5_d5S6HeQBWf0ZzwQ6TjRA_, but other brokers are connected to Cluster with ID 
_OhWuEGMeQHe66HP74rurRA_.


If we bring back the order of zookeeper servers in the config file then Kafka 
broker starts normally and connects to an existing cluster.

This issue blocks us from adding new nodes to the cluster and removes the old 
ones. 

Cluster details:
 * 3 Kafka nodes cluster running 2.3.1 (also reproduced on 2.4.0)
 * 3 Zookeeper node cluster running 3.4.10

 


> Broker try to connect to a new cluster when there are changes in 
> zookeeper.connect properties
> ---------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10363
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10363
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.0, 2.3.1
>         Environment: 3 Kafka brokers (v2.3.1, v2.4.0) with Zookeeper cluster 
> (3.4.10)
> Ubuntu 18.04 LTS
>            Reporter: Alexey Kornev
>            Priority: Critical
>
> We've just successfully set up a Kafka cluster consists of 3 brokers and 
> faced with the following issue: when we change order of zookeeper servers in 
> zookeeper.connect property in server.properties files and restart Kafka 
> broker then this Kafka broker tries to connect to a new Kafka cluster. As a 
> result, Kafka broker throws an error and shutdown. 
> For example, config server.properties on first broker:
> {code:java}
> broker.id=-1
> ...
> zookeeper.connect=node_1:2181/kafka,node_2:2181/kafka,node_3:2181/kafka
> {code}
>  We changed it to 
> {code:java}
> broker.id=-1
> ...
> zookeeper.connect=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka {code}
> and restart Kafka broker. 
> Logs:
> {code:java}
> [2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:07:55,658] INFO [ExpirationReaper-0-topic]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:07:57,070] INFO Registered kafka:type=kafka.Log4jController MBean 
> (kafka.utils.Log4jControllerRegistration$)[2020-08-05 09:07:57,656] INFO 
> Registered signal handlers for TERM, INT, HUP 
> (org.apache.kafka.common.utils.LoggingSignalHandler)[2020-08-05 09:07:57,657] 
> INFO starting (kafka.server.KafkaServer)[2020-08-05 09:07:57,658] INFO 
> Connecting to zookeeper on 
> node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
> (kafka.server.KafkaServer)[2020-08-05 09:07:57,685] INFO [ZooKeeperClient 
> Kafka server] Initializing a new session to node_2:2181. 
> (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,690] INFO Client 
> environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf,
>  built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)[2020-08-05 
> 09:07:57,693] INFO Client environment:host.name=localhost 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client 
> environment:java.version=11.0.8 (org.apache.zookeeper.ZooKeeper)[2020-08-05 
> 09:07:57,696] INFO Client environment:java.vendor=Ubuntu 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
> environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
> environment:java.class.path=/opt/kafka/current/bin/../libs/activation-1.1.1.jar:/opt/kafka/current/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/current/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/current/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/current/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/current/bin/../libs/connect-api-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-basic-auth-extension-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-file-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-json-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-runtime-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-transforms-2.3.1.jar:/opt/kafka/current/bin/../libs/guava-20.0.jar:/opt/kafka/current/bin/../libs/hk2-api-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-locator-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-utils-2.5.0.jar:/opt/kafka/current/bin/../libs/jackson-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-core-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-databind-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-scala_2.12-2.10.0.jar:/opt/kafka/current/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/kafka/current/bin/../libs/jakarta.annotation-api-1.3.4.jar:/opt/kafka/current/bin/../libs/jakarta.inject-2.5.0.jar:/opt/kafka/current/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/opt/kafka/current/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/kafka/current/bin/../libs/javassist-3.22.0-CR2.jar:/opt/kafka/current/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/current/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/current/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/current/bin/../libs/jersey-client-2.28.jar:/opt/kafka/current/bin/../libs/jersey-common-2.28.jar:/opt/kafka/current/bin/../libs/jersey-container-servlet-2.28.jar:/opt/kafka/current/bin/../libs/jersey-container-servlet-core-2.28.jar:/opt/kafka/current/bin/../libs/jersey-hk2-2.28.jar:/opt/kafka/current/bin/../libs/jersey-media-jaxb-2.28.jar:/opt/kafka/current/bin/../libs/jersey-server-2.28.jar:/opt/kafka/current/bin/../libs/jetty-client-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-continuation-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-http-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-io-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-security-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-server-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-servlet-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-servlets-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jetty-util-9.4.18.v20190429.jar:/opt/kafka/current/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka/current/bin/../libs/jsr305-3.0.2.jar:/opt/kafka/current/bin/../libs/kafka-clients-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-log4j-appender-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-examples-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-scala_2.12-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-streams-test-utils-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka-tools-2.3.1.jar:/opt/kafka/current/bin/../libs/kafka_2.12-2.3.1-sources.jar:/opt/kafka/current/bin/../libs/kafka_2.12-2.3.1.jar:/opt/kafka/current/bin/../libs/log4j-1.2.17.jar:/opt/kafka/current/bin/../libs/lz4-java-1.6.0.jar:/opt/kafka/current/bin/../libs/maven-artifact-3.6.1.jar:/opt/kafka/current/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/current/bin/../libs/osgi-resource-locator-1.0.1.jar:/opt/kafka/current/bin/../libs/paranamer-2.8.jar:/opt/kafka/current/bin/../libs/plexus-utils-3.2.0.jar:/opt/kafka/current/bin/../libs/reflections-0.9.11.jar:/opt/kafka/current/bin/../libs/rocksdbjni-5.18.3.jar:/opt/kafka/current/bin/../libs/scala-library-2.12.10.jar:/opt/kafka/current/bin/../libs/scala-library-2.12.8.jar:/opt/kafka/current/bin/../libs/scala-logging_2.12-3.9.0.jar:/opt/kafka/current/bin/../libs/scala-reflect-2.12.8.jar:/opt/kafka/current/bin/../libs/slf4j-api-1.7.26.jar:/opt/kafka/current/bin/../libs/slf4j-log4j12-1.7.26.jar:/opt/kafka/current/bin/../libs/snappy-java-1.1.7.3.jar:/opt/kafka/current/bin/../libs/spotbugs-annotations-3.1.9.jar:/opt/kafka/current/bin/../libs/validation-api-2.0.1.Final.jar:/opt/kafka/current/bin/../libs/zkclient-0.11.jar:/opt/kafka/current/bin/../libs/zookeeper-3.4.14.jar:/opt/kafka/current/bin/../libs/zstd-jni-1.4.0-1.jar
>  (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,697] INFO Client 
> environment:java.library.path=/usr/java/packages/lib:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>  (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,697] INFO Client 
> environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)[2020-08-05 
> 09:07:57,697] INFO Client environment:java.compiler=<NA> 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,697] INFO Client 
> environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)[2020-08-05 
> 09:07:57,697] INFO Client environment:os.arch=amd64 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,698] INFO Client 
> environment:os.version=4.15.0-66-generic 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,698] INFO Client 
> environment:user.name=kafka (org.apache.zookeeper.ZooKeeper)[2020-08-05 
> 09:07:57,698] INFO Client environment:user.home=/opt/kafka 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,698] INFO Client 
> environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)[2020-08-05 
> 09:07:57,699] INFO Initiating client connection, connectString=node_2:2181 
> sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@74bada02 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,718] INFO 
> [ZooKeeperClient Kafka server] Waiting until connected. 
> (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,818] INFO Client 
> successfully logged in. (org.apache.zookeeper.Login)[2020-08-05 09:07:57,821] 
> INFO Client will use DIGEST-MD5 as SASL mechanism. 
> (org.apache.zookeeper.client.ZooKeeperSaslClient)[2020-08-05 09:07:57,826] 
> INFO Opening socket connection to server node_2/node_2:2181. Will attempt to 
> SASL-authenticate using Login Context section 'Client' 
> (org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,832] INFO Socket 
> connection established to node_2/node_2:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,841] INFO Session 
> establishment complete on server node_2/node_2:2181, sessionid = 
> 0x373bdbbd3b00002, negotiated timeout = 6000 
> (org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,847] INFO 
> [ZooKeeperClient Kafka server] Connected. 
> (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,925] INFO Created 
> zookeeper path /kafka,node_3:2181/kafka,node_1:2181/kafka 
> (kafka.server.KafkaServer)[2020-08-05 09:07:57,926] INFO [ZooKeeperClient 
> Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)[2020-08-05 
> 09:07:57,933] INFO Session: 0x373bdbbd3b00002 closed 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,934] INFO EventThread 
> shut down for session: 0x373bdbbd3b00002 
> (org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,937] INFO 
> [ZooKeeperClient Kafka server] Closed. 
> (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,939] INFO 
> [ZooKeeperClient Kafka server] Initializing a new session to 
> node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka. 
> (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,939] INFO Initiating 
> client connection, 
> connectString=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
> sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@6ff65192 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,940] INFO 
> [ZooKeeperClient Kafka server] Waiting until connected. 
> (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,941] INFO Client will 
> use DIGEST-MD5 as SASL mechanism. 
> (org.apache.zookeeper.client.ZooKeeperSaslClient)[2020-08-05 09:07:57,943] 
> INFO Opening socket connection to server node_2/node_2:2181. Will attempt to 
> SASL-authenticate using Login Context section 'Client' 
> (org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,944] INFO Socket 
> connection established to node_2/node_2:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,949] INFO Session 
> establishment complete on server node_2/node_2:2181, sessionid = 
> 0x373bdbbd3b00003, negotiated timeout = 6000 
> (org.apache.zookeeper.ClientCnxn)[2020-08-05 09:07:57,950] INFO 
> [ZooKeeperClient Kafka server] Connected. 
> (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:58,167] INFO Cluster ID = 
> 5_d5S6HeQBWf0ZzwQ6TjRA (kafka.server.KafkaServer)[2020-08-05 09:07:58,253] 
> INFO KafkaConfig values:  advertised.host.name = node_3 advertised.listeners 
> = SSL://node_3:9093 advertised.port = null alter.config.policy.class.name = 
> null alter.log.dirs.replication.quota.window.num = 11 
> alter.log.dirs.replication.quota.window.size.seconds = 1 
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer 
> auto.create.topics.enable = true auto.leader.rebalance.enable = true 
> background.threads = 10 broker.id = -1 broker.id.generation.enable = true 
> broker.rack = null client.quota.callback.class = null compression.type = 
> producer connection.failed.authentication.delay.ms = 100 
> connections.max.idle.ms = 600000 connections.max.reauth.ms = 0 
> control.plane.listener.name = null controlled.shutdown.enable = true 
> controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 
> 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = 
> null default.replication.factor = 2 delegation.token.expiry.check.interval.ms 
> = 3600000 delegation.token.expiry.time.ms = 86400000 
> delegation.token.master.key = null delegation.token.max.lifetime.ms = 
> 604800000 delete.records.purgatory.purge.interval.requests = 1 
> delete.topic.enable = true fetch.purgatory.purge.interval.requests = 1000 
> group.initial.rebalance.delay.ms = 3000 group.max.session.timeout.ms = 60000 
> group.max.size = 2147483647 group.min.session.timeout.ms = 10000 host.name = 
> node_3 inter.broker.listener.name = null inter.broker.protocol.version = 
> 2.3-IV1 kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] 
> leader.imbalance.check.interval.seconds = 300 
> leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = 
> SSL:SSL listeners = SSL://node_3:9093 log.cleaner.backoff.ms = 15000 
> log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 
> 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 
> log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 
> 1.7976931348623157E308 log.cleaner.max.compaction.lag.ms = 
> 9223372036854775807 log.cleaner.min.cleanable.ratio = 0.5 
> log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 
> log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = 
> /var/lib/kafka log.flush.interval.messages = 5000 log.flush.interval.ms = 
> 5000 log.flush.offset.checkpoint.interval.ms = 60000 
> log.flush.scheduler.interval.ms = 9223372036854775807 
> log.flush.start.offset.checkpoint.interval.ms = 60000 
> log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 
> log.message.downconversion.enable = true log.message.format.version = 2.3-IV1 
> log.message.timestamp.difference.max.ms = 9223372036854775807 
> log.message.timestamp.type = CreateTime log.preallocate = false 
> log.retention.bytes = -1 log.retention.check.interval.ms = 300000 
> log.retention.hours = 336 log.retention.minutes = null log.retention.ms = 
> null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null 
> log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms 
> = 60000 max.connections = 2147483647 max.connections.per.ip = 2147483647 
> max.connections.per.ip.overrides =  max.incremental.fetch.session.cache.slots 
> = 1000 message.max.bytes = 2000024 metric.reporters = [] metrics.num.samples 
> = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 
> min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 
> num.partitions = 4 num.recovery.threads.per.data.dir = 4 
> num.replica.alter.log.dirs.threads = null num.replica.fetchers = 1 
> offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 
> offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 
> offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 
> 10080 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 
> offsets.topic.replication.factor = 2 offsets.topic.segment.bytes = 104857600 
> password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding 
> password.encoder.iterations = 4096 password.encoder.key.length = 128 
> password.encoder.keyfactory.algorithm = null password.encoder.old.secret = 
> null password.encoder.secret = null port = 9092 principal.builder.class = 
> null producer.purgatory.purge.interval.requests = 1000 
> queued.max.request.bytes = -1 queued.max.requests = 500 
> quota.consumer.default = 9223372036854775807 quota.producer.default = 
> 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 
> replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 
> replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 
> replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms 
> = 5000 replica.lag.time.max.ms = 30000 replica.socket.receive.buffer.bytes = 
> 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 
> replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 
> reserved.broker.max.id = 1000 sasl.client.callback.handler.class = null 
> sasl.enabled.mechanisms = [GSSAPI] sasl.jaas.config = null 
> sasl.kerberos.kinit.cmd = /usr/bin/kinit 
> sasl.kerberos.min.time.before.relogin = 60000 
> sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name 
> = null sasl.kerberos.ticket.renew.jitter = 0.05 
> sasl.kerberos.ticket.renew.window.factor = 0.8 
> sasl.login.callback.handler.class = null sasl.login.class = null 
> sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds 
> = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter 
> = 0.05 sasl.mechanism.inter.broker.protocol = GSSAPI 
> sasl.server.callback.handler.class = null security.inter.broker.protocol = 
> SSL socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 
> socket.send.buffer.bytes = 102400 ssl.cipher.suites = [] ssl.client.auth = 
> required ssl.enabled.protocols = [TLSv1.2] 
> ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden] 
> ssl.keymanager.algorithm = SunX509 ssl.keystore.location = 
> /etc/ssl/kafka/kafka.keystore.jks ssl.keystore.password = [hidden] 
> ssl.keystore.type = JKS ssl.principal.mapping.rules = [DEFAULT] ssl.protocol 
> = TLS ssl.provider = null ssl.secure.random.implementation = SHA1PRNG 
> ssl.trustmanager.algorithm = PKIX ssl.truststore.location = 
> /etc/ssl/kafka/kafka.truststore.jks ssl.truststore.password = [hidden] 
> ssl.truststore.type = JKS 
> transaction.abort.timed.out.transaction.cleanup.interval.ms = 180000 
> transaction.max.timeout.ms = 900000 
> transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 
> transaction.state.log.load.buffer.size = 5242880 
> transaction.state.log.min.isr = 2 transaction.state.log.num.partitions = 50 
> transaction.state.log.replication.factor = 2 
> transaction.state.log.segment.bytes = 104857600 
> transactional.id.expiration.ms = 604800000 unclean.leader.election.enable = 
> false zookeeper.connect = 
> node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
> zookeeper.connection.timeout.ms = 15000 zookeeper.max.in.flight.requests = 10 
> zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = true 
> zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig)[2020-08-05 
> 09:07:58,272] INFO KafkaConfig values:  advertised.host.name = node_3 
> advertised.listeners = SSL://node_3:9093 advertised.port = null 
> alter.config.policy.class.name = null 
> alter.log.dirs.replication.quota.window.num = 11 
> alter.log.dirs.replication.quota.window.size.seconds = 1 
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer 
> auto.create.topics.enable = true auto.leader.rebalance.enable = true 
> background.threads = 10 broker.id = -1 broker.id.generation.enable = true 
> broker.rack = null client.quota.callback.class = null compression.type = 
> producer connection.failed.authentication.delay.ms = 100 
> connections.max.idle.ms = 600000 connections.max.reauth.ms = 0 
> control.plane.listener.name = null controlled.shutdown.enable = true 
> controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 
> 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = 
> null default.replication.factor = 2 delegation.token.expiry.check.interval.ms 
> = 3600000 delegation.token.expiry.time.ms = 86400000 
> delegation.token.master.key = null delegation.token.max.lifetime.ms = 
> 604800000 delete.records.purgatory.purge.interval.requests = 1 
> delete.topic.enable = true fetch.purgatory.purge.interval.requests = 1000 
> group.initial.rebalance.delay.ms = 3000 group.max.session.timeout.ms = 60000 
> group.max.size = 2147483647 group.min.session.timeout.ms = 10000 host.name = 
> node_3 inter.broker.listener.name = null inter.broker.protocol.version = 
> 2.3-IV1 kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] 
> leader.imbalance.check.interval.seconds = 300 
> leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = 
> SSL:SSL listeners = SSL://node_3:9093 log.cleaner.backoff.ms = 15000 
> log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 
> 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 
> log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 
> 1.7976931348623157E308 log.cleaner.max.compaction.lag.ms = 
> 9223372036854775807 log.cleaner.min.cleanable.ratio = 0.5 
> log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 
> log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = 
> /var/lib/kafka log.flush.interval.messages = 5000 log.flush.interval.ms = 
> 5000 log.flush.offset.checkpoint.interval.ms = 60000 
> log.flush.scheduler.interval.ms = 9223372036854775807 
> log.flush.start.offset.checkpoint.interval.ms = 60000 
> log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 
> log.message.downconversion.enable = true log.message.format.version = 2.3-IV1 
> log.message.timestamp.difference.max.ms = 9223372036854775807 
> log.message.timestamp.type = CreateTime log.preallocate = false 
> log.retention.bytes = -1 log.retention.check.interval.ms = 300000 
> log.retention.hours = 336 log.retention.minutes = null log.retention.ms = 
> null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null 
> log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms 
> = 60000 max.connections = 2147483647 max.connections.per.ip = 2147483647 
> max.connections.per.ip.overrides =  max.incremental.fetch.session.cache.slots 
> = 1000 message.max.bytes = 2000024 metric.reporters = [] metrics.num.samples 
> = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 
> min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 
> num.partitions = 4 num.recovery.threads.per.data.dir = 4 
> num.replica.alter.log.dirs.threads = null num.replica.fetchers = 1 
> offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 
> offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 
> offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 
> 10080 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 
> offsets.topic.replication.factor = 2 offsets.topic.segment.bytes = 104857600 
> password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding 
> password.encoder.iterations = 4096 password.encoder.key.length = 128 
> password.encoder.keyfactory.algorithm = null password.encoder.old.secret = 
> null password.encoder.secret = null port = 9092 principal.builder.class = 
> null producer.purgatory.purge.interval.requests = 1000 
> queued.max.request.bytes = -1 queued.max.requests = 500 
> quota.consumer.default = 9223372036854775807 quota.producer.default = 
> 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 
> replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 
> replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 
> replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms 
> = 5000 replica.lag.time.max.ms = 30000 replica.socket.receive.buffer.bytes = 
> 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 
> replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 
> reserved.broker.max.id = 1000 sasl.client.callback.handler.class = null 
> sasl.enabled.mechanisms = [GSSAPI] sasl.jaas.config = null 
> sasl.kerberos.kinit.cmd = /usr/bin/kinit 
> sasl.kerberos.min.time.before.relogin = 60000 
> sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name 
> = null sasl.kerberos.ticket.renew.jitter = 0.05 
> sasl.kerberos.ticket.renew.window.factor = 0.8 
> sasl.login.callback.handler.class = null sasl.login.class = null 
> sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds 
> = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter 
> = 0.05 sasl.mechanism.inter.broker.protocol = GSSAPI 
> sasl.server.callback.handler.class = null security.inter.broker.protocol = 
> SSL socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 
> socket.send.buffer.bytes = 102400 ssl.cipher.suites = [] ssl.client.auth = 
> required ssl.enabled.protocols = [TLSv1.2] 
> ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden] 
> ssl.keymanager.algorithm = SunX509 ssl.keystore.location = 
> /etc/ssl/kafka/kafka.keystore.jks ssl.keystore.password = [hidden] 
> ssl.keystore.type = JKS ssl.principal.mapping.rules = [DEFAULT] ssl.protocol 
> = TLS ssl.provider = null ssl.secure.random.implementation = SHA1PRNG 
> ssl.trustmanager.algorithm = PKIX ssl.truststore.location = 
> /etc/ssl/kafka/kafka.truststore.jks ssl.truststore.password = [hidden] 
> ssl.truststore.type = JKS 
> transaction.abort.timed.out.transaction.cleanup.interval.ms = 180000 
> transaction.max.timeout.ms = 900000 
> transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 
> transaction.state.log.load.buffer.size = 5242880 
> transaction.state.log.min.isr = 2 transaction.state.log.num.partitions = 50 
> transaction.state.log.replication.factor = 2 
> transaction.state.log.segment.bytes = 104857600 
> transactional.id.expiration.ms = 604800000 unclean.leader.election.enable = 
> false zookeeper.connect = 
> node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
> zookeeper.connection.timeout.ms = 15000 zookeeper.max.in.flight.requests = 10 
> zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = true 
> zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig)[2020-08-05 
> 09:07:58,328] INFO [ThrottledChannelReaper-Produce]: Starting 
> (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2020-08-05 
> 09:07:58,328] INFO [ThrottledChannelReaper-Fetch]: Starting 
> (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2020-08-05 
> 09:07:58,331] INFO [ThrottledChannelReaper-Request]: Starting 
> (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2020-08-05 
> 09:07:58,361] INFO Loading logs. (kafka.log.LogManager)[2020-08-05 
> 09:07:58,374] INFO Logs loading complete in 13 ms. 
> (kafka.log.LogManager)[2020-08-05 09:07:58,403] INFO Starting log cleanup 
> with a period of 300000 ms. (kafka.log.LogManager)[2020-08-05 09:07:58,407] 
> INFO Starting log flusher with a default period of 9223372036854775807 ms. 
> (kafka.log.LogManager)[2020-08-05 09:07:58,805] INFO Awaiting socket 
> connections on node_3:9093. (kafka.network.Acceptor)[2020-08-05 09:08:00,594] 
> INFO [SocketServer brokerId=0] Created data-plane acceptor and processors for 
> endpoint : EndPoint(node_3,9093,ListenerName(SSL),SSL) 
> (kafka.network.SocketServer)[2020-08-05 09:08:00,596] INFO [SocketServer 
> brokerId=0] Started 1 acceptor threads for data-plane 
> (kafka.network.SocketServer)[2020-08-05 09:08:00,629] INFO 
> [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:08:00,630] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:08:00,630] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:08:00,631] INFO [ExpirationReaper-0-ElectPreferredLeader]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:08:00,660] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)[2020-08-05 09:08:00,726] 
> INFO Creating /brokers/ids/0 (is it secure? true) 
> (kafka.zk.KafkaZkClient)[2020-08-05 09:08:00,751] ERROR Error while creating 
> ephemeral at /brokers/ids/0, node already exists and owner 
> '248751018843570177' does not match current session '248751018843570179' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)[2020-08-05 09:08:00,757] ERROR 
> [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to 
> shutdown 
> (kafka.server.KafkaServer)org.apache.zookeeper.KeeperException$NodeExistsException:
>  KeeperErrorCode = NodeExists at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:122) at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1784)
>  at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1722) 
> at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1689) at 
> kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:97) at 
> kafka.server.KafkaServer.startup(KafkaServer.scala:262) at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) at 
> kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)[2020-08-05 
> 09:08:00,763] INFO [KafkaServer id=0] shutting down (kafka.server.KafkaServer)
> {code}
>  
> As you can see here, this broker tries to connect to Cluster with ID 
> _5_d5S6HeQBWf0ZzwQ6TjRA_, but other brokers are connected to Cluster with ID 
> _OhWuEGMeQHe66HP74rurRA_.
> If we bring back the order of zookeeper servers in the config file then Kafka 
> broker starts normally and connects to an existing cluster.
> This issue blocks us from adding new nodes to the cluster and removes the old 
> ones.
> Cluster details:
>  * 3 Kafka nodes cluster running 2.3.1 (also reproduced on 2.4.0)
>  * 3 Zookeeper node cluster running 3.4.10
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to