[ 
https://issues.apache.org/jira/browse/KAFKA-14778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17697181#comment-17697181
 ] 

Matthias J. Sax commented on KAFKA-14778:
-----------------------------------------

Thanks for reporting this. – I believe the issue is as follows:
 * on restart, the consumer sends join group request (with laters subscription 
version)
 * broker returns buffered assignment without a rebalance (as it should)
 * KS inspects assignment and observes lower subscription – now things go 
"wrong"
 ** KS thinks that the assignment is invalid (because the leader should have 
sent an empty assignment on version probing only encoding it's version number)
 ** KS triggers a new rebalance sending it's subscription with lower 
subscription-version (as it assumes the leader could not decode the first 
subscription it sent)

It seem the fix might be to check if static membership is enabled and if the 
received assignment is empty of not. If static membership is enabled and if the 
assignment is not empty, it's not necessary to trigger a new rebalance.

One drawback is that all KS instances would stay on the old assignment, thus it 
might actually be desirable to have a single rebalance that allows all 
instances to switch to the new subscription/assignment version. This single 
rebalance should only happen _after_ all instances got updated. However, it's 
unclear how we could trigger such an rebalance, and it's also an open question 
if this rebalance is really desired or not?

> Kafka Streams 2.7.1 to 3.3.1 rolling upgrade with static membership triggers 
> a rebalance
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14778
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14778
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.7.1, 3.3.1
>            Reporter: Vinoth Rengarajan
>            Priority: Major
>
> Trying to upgrade Kaka Streams application from 2.7.1 to 3.3.1 with static 
> membership but it triggers a rebalance
> Brokers are running on Kafka 2.7.1. Enabled the static membership in the 
> application. Below are the configs {*}(Stream Config & Consumer Config){*}.
> Followed below steps to upgrade
>  * Brokers are running on Kafka 2.7.1(tried with 3.3.1 version then also 
> rebalance happens).
>  * Application is running with 2.7.1 Kafka streams libraries.
>  * Deployed the latest version of the application with 3.3.1 Kafka streams 
> libraries, and configured the *upgrade.from* property to 2.7 (based on the 
> upgrade documentation available here 
> [https://kafka.apache.org/33/documentation/streams/upgrade-guide]).
>  * Doing a rolling bounce with the latest changes, rebalance is being 
> triggered on other instances in the cluster.
> Below are logs on the instance which is being bounced, forcing a rebalance on 
> others. 
> *Logs:*
>  
> {code:java}
> INFO  2023-02-27 09:52:16.805 | streams.KafkaStreams stream-client 
> [kafka_upgrade.Kafka_Upgrade_Test] State transition from CREATED to 
> REBALANCING
> INFO  2023-02-27 09:52:16.946 | internals.ConsumerCoordinator [Consumer 
> instanceId=kafka_upgrade.Kafka_Upgrade_Test-4, 
> clientId=kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer, 
> groupId=kafka_upgrade.Kafka_Upgrade_Test] Notifying assignor about the new 
> Assignment(partitions=[kafka_upgrade.Kafka_Upgrade_Test-version-updates-11, 
> kafka_upgrade.Kafka_Upgrade_Test-version-updates-23], userDataSize=56)
> INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] Sent 
> a version 11 subscription and got version 8 assignment back (successful 
> version probing). Downgrade subscription metadata to commonly supported 
> version 8 and trigger new rebalance.
> INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] Sent 
> a version 11 subscription and got version 8 assignment back (successful 
> version probing). Downgrade subscription metadata to commonly supported 
> version 8 and trigger new rebalance.
> INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] Sent 
> a version 11 subscription and got version 8 assignment back (successful 
> version probing). Downgrade subscription metadata to commonly supported 
> version 8 and trigger new rebalance.
> INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] Sent 
> a version 11 subscription and got version 8 assignment back (successful 
> version probing). Downgrade subscription metadata to commonly supported 
> version 8 and trigger new rebalance.
> INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] 
> Requested to schedule immediate rebalance due to version probing.
> INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] 
> Requested to schedule immediate rebalance due to version probing.
> INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] 
> Requested to schedule immediate rebalance due to version probing.
> INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] 
> Requested to schedule immediate rebalance due to version probing. {code}
>  
>  
> *Streams Config:*
>  
> {code:java}
> acceptable.recovery.lag = 10000
> application.id = Kafka_Upgrade_Test
> application.server =
> bootstrap.servers = [broker1, broker2, broker3]
> buffered.records.per.partition = 1000
> built.in.metrics.version = latest
> cache.max.bytes.buffering = 10485760
> client.id = kafka_upgrade.Kafka_Upgrade_Test
> commit.interval.ms = 30000
> connections.max.idle.ms = 540000
> default.deserialization.exception.handler = class 
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
> default.dsl.store = rocksDB
> default.key.serde = null
> default.list.key.serde.inner = null
> default.list.key.serde.type = null
> default.list.value.serde.inner = null
> default.list.value.serde.type = null
> default.production.exception.handler = class 
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
> default.timestamp.extractor = class 
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> default.value.serde = null
> max.task.idle.ms = 0
> max.warmup.replicas = 2
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> num.standby.replicas = 0
> num.stream.threads = 4
> poll.ms = 100
> probing.rebalance.interval.ms = 600000
> processing.guarantee = at_least_once
> rack.aware.assignment.tags = []
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> repartition.purge.interval.ms = 30000
> replication.factor = 3
> request.timeout.ms = 120000
> retries = 0
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 600000
> state.dir = /mnt/store/yukon-apps
> task.timeout.ms = 300000
> topology.optimization = none
> upgrade.from = 2.7
> window.size.ms = null
> windowed.inner.class.serde = null
> windowstore.changelog.additional.retention.ms = 86400000 {code}
>  
>  
> *Consumer Config:*
>  
> {code:java}
> allow.auto.create.topics = true
> auto.commit.interval.ms = 10000
> auto.offset.reset = none
> bootstrap.servers = [server1, server2, server3]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1
> client.rack =
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = Kafka_Upgrade_Test
> group.instance.id = Kafka_Upgrade_Test-1
> heartbeat.interval.ms = 5000
> interceptor.classes = []
> internal.leave.group.on.close = false
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 1073741823
> max.poll.records = 2000
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = 
> [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 120000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.connect.timeout.ms = null
> sasl.login.read.timeout.ms = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.login.retry.backoff.max.ms = 10000
> sasl.login.retry.backoff.ms = 100
> sasl.mechanism = GSSAPI
> sasl.oauthbearer.clock.skew.seconds = 30
> sasl.oauthbearer.expected.audience = null
> sasl.oauthbearer.expected.issuer = null
> sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
> sasl.oauthbearer.jwks.endpoint.url = null
> sasl.oauthbearer.scope.claim.name = scope
> sasl.oauthbearer.sub.claim.name = sub
> sasl.oauthbearer.token.endpoint.url = null
> security.protocol = PLAINTEXT
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 300000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.2
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to