[ https://issues.apache.org/jira/browse/KAFKA-14778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17697181#comment-17697181 ]
Matthias J. Sax commented on KAFKA-14778: ----------------------------------------- Thanks for reporting this. – I believe the issue is as follows: * on restart, the consumer sends join group request (with laters subscription version) * broker returns buffered assignment without a rebalance (as it should) * KS inspects assignment and observes lower subscription – now things go "wrong" ** KS thinks that the assignment is invalid (because the leader should have sent an empty assignment on version probing only encoding it's version number) ** KS triggers a new rebalance sending it's subscription with lower subscription-version (as it assumes the leader could not decode the first subscription it sent) It seem the fix might be to check if static membership is enabled and if the received assignment is empty of not. If static membership is enabled and if the assignment is not empty, it's not necessary to trigger a new rebalance. One drawback is that all KS instances would stay on the old assignment, thus it might actually be desirable to have a single rebalance that allows all instances to switch to the new subscription/assignment version. This single rebalance should only happen _after_ all instances got updated. However, it's unclear how we could trigger such an rebalance, and it's also an open question if this rebalance is really desired or not? > Kafka Streams 2.7.1 to 3.3.1 rolling upgrade with static membership triggers > a rebalance > ---------------------------------------------------------------------------------------- > > Key: KAFKA-14778 > URL: https://issues.apache.org/jira/browse/KAFKA-14778 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.1, 3.3.1 > Reporter: Vinoth Rengarajan > Priority: Major > > Trying to upgrade Kaka Streams application from 2.7.1 to 3.3.1 with static > membership but it triggers a rebalance > Brokers are running on Kafka 2.7.1. Enabled the static membership in the > application. Below are the configs {*}(Stream Config & Consumer Config){*}. > Followed below steps to upgrade > * Brokers are running on Kafka 2.7.1(tried with 3.3.1 version then also > rebalance happens). > * Application is running with 2.7.1 Kafka streams libraries. > * Deployed the latest version of the application with 3.3.1 Kafka streams > libraries, and configured the *upgrade.from* property to 2.7 (based on the > upgrade documentation available here > [https://kafka.apache.org/33/documentation/streams/upgrade-guide]). > * Doing a rolling bounce with the latest changes, rebalance is being > triggered on other instances in the cluster. > Below are logs on the instance which is being bounced, forcing a rebalance on > others. > *Logs:* > > {code:java} > INFO 2023-02-27 09:52:16.805 | streams.KafkaStreams stream-client > [kafka_upgrade.Kafka_Upgrade_Test] State transition from CREATED to > REBALANCING > INFO 2023-02-27 09:52:16.946 | internals.ConsumerCoordinator [Consumer > instanceId=kafka_upgrade.Kafka_Upgrade_Test-4, > clientId=kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer, > groupId=kafka_upgrade.Kafka_Upgrade_Test] Notifying assignor about the new > Assignment(partitions=[kafka_upgrade.Kafka_Upgrade_Test-version-updates-11, > kafka_upgrade.Kafka_Upgrade_Test-version-updates-23], userDataSize=56) > INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor > stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] Sent > a version 11 subscription and got version 8 assignment back (successful > version probing). Downgrade subscription metadata to commonly supported > version 8 and trigger new rebalance. > INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor > stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] Sent > a version 11 subscription and got version 8 assignment back (successful > version probing). Downgrade subscription metadata to commonly supported > version 8 and trigger new rebalance. > INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor > stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] Sent > a version 11 subscription and got version 8 assignment back (successful > version probing). Downgrade subscription metadata to commonly supported > version 8 and trigger new rebalance. > INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor > stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] Sent > a version 11 subscription and got version 8 assignment back (successful > version probing). Downgrade subscription metadata to commonly supported > version 8 and trigger new rebalance. > INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor > stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] > Requested to schedule immediate rebalance due to version probing. > INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor > stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] > Requested to schedule immediate rebalance due to version probing. > INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor > stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] > Requested to schedule immediate rebalance due to version probing. > INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor > stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] > Requested to schedule immediate rebalance due to version probing. {code} > > > *Streams Config:* > > {code:java} > acceptable.recovery.lag = 10000 > application.id = Kafka_Upgrade_Test > application.server = > bootstrap.servers = [broker1, broker2, broker3] > buffered.records.per.partition = 1000 > built.in.metrics.version = latest > cache.max.bytes.buffering = 10485760 > client.id = kafka_upgrade.Kafka_Upgrade_Test > commit.interval.ms = 30000 > connections.max.idle.ms = 540000 > default.deserialization.exception.handler = class > org.apache.kafka.streams.errors.LogAndFailExceptionHandler > default.dsl.store = rocksDB > default.key.serde = null > default.list.key.serde.inner = null > default.list.key.serde.type = null > default.list.value.serde.inner = null > default.list.value.serde.type = null > default.production.exception.handler = class > org.apache.kafka.streams.errors.DefaultProductionExceptionHandler > default.timestamp.extractor = class > org.apache.kafka.streams.processor.FailOnInvalidTimestamp > default.value.serde = null > max.task.idle.ms = 0 > max.warmup.replicas = 2 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > num.standby.replicas = 0 > num.stream.threads = 4 > poll.ms = 100 > probing.rebalance.interval.ms = 600000 > processing.guarantee = at_least_once > rack.aware.assignment.tags = [] > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > repartition.purge.interval.ms = 30000 > replication.factor = 3 > request.timeout.ms = 120000 > retries = 0 > retry.backoff.ms = 100 > rocksdb.config.setter = null > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > state.cleanup.delay.ms = 600000 > state.dir = /mnt/store/yukon-apps > task.timeout.ms = 300000 > topology.optimization = none > upgrade.from = 2.7 > window.size.ms = null > windowed.inner.class.serde = null > windowstore.changelog.additional.retention.ms = 86400000 {code} > > > *Consumer Config:* > > {code:java} > allow.auto.create.topics = true > auto.commit.interval.ms = 10000 > auto.offset.reset = none > bootstrap.servers = [server1, server2, server3] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1 > client.rack = > connections.max.idle.ms = 540000 > default.api.timeout.ms = 60000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = Kafka_Upgrade_Test > group.instance.id = Kafka_Upgrade_Test-1 > heartbeat.interval.ms = 5000 > interceptor.classes = [] > internal.leave.group.on.close = false > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 1073741823 > max.poll.records = 2000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = > [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 120000 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.login.retry.backoff.max.ms = 10000 > sasl.login.retry.backoff.ms = 100 > sasl.mechanism = GSSAPI > sasl.oauthbearer.clock.skew.seconds = 30 > sasl.oauthbearer.expected.audience = null > sasl.oauthbearer.expected.issuer = null > sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 > sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 > sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 > sasl.oauthbearer.jwks.endpoint.url = null > sasl.oauthbearer.scope.claim.name = scope > sasl.oauthbearer.sub.claim.name = sub > sasl.oauthbearer.token.endpoint.url = null > security.protocol = PLAINTEXT > security.providers = null > send.buffer.bytes = 131072 > session.timeout.ms = 300000 > socket.connection.setup.timeout.max.ms = 30000 > socket.connection.setup.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2] > ssl.endpoint.identification.algorithm = https > ssl.engine.factory.class = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.certificate.chain = null > ssl.keystore.key = null > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLSv1.2 > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.certificates = null > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)