Re: Kafka Streams 2.7.1 to 3.3.1 rolling upgrade
Hmmm... that's interesting... It seems that Kafka Streams "version probing" does not play well static group membership... Sounds like a "bug" to me -- well, more like a missing integration. Not sure right now, if/how we could fix it. Can you file a ticket? For now, I don't think you can do anything about it. Sorry. :( -Matthias On 2/27/23 6:50 AM, Vinoth Rengarajan wrote: Hi Team, I am trying to upgrade my Kaka Streams application from 2.7.1 to 3.3.1. Brokers are running on Kafka 2.7.1. The plan is to upgrade the clients first and then then brokers I have already enabled the static membership in our application so that we I am not expecting a rebalance. Below are the configs *(Stream Config & Consumer Config)*. As mentioned earlier, the application is running on Kafka 2.7.1. I deployed the latest version of the app with 3.3.1 streams libraries, and configured the '*upgrade.from' *property to 2.7 (based on the upgrade documentation available here https://kafka.apache.org/33/documentation/streams/upgrade-guide). When I do a rolling bounce with the latest changes, I can see a rebalance being triggered on other instances in the cluster. I can see the below logs on the instance which is being bounced, forcing a rebalance on others. Am I missing something? How can I avoid other instances in the cluster from rebalancing? *Logs:* INFO 2023-02-27 09:52:16.805 | streams.KafkaStreams stream-client [kafka_upgrade.Kafka_Upgrade_Test] State transition from CREATED to REBALANCING INFO 2023-02-27 09:52:16.946 | internals.ConsumerCoordinator [Consumer instanceId=kafka_upgrade.Kafka_Upgrade_Test-4, clientId=kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer, groupId=kafka_upgrade.Kafka_Upgrade_Test] Notifying assignor about the new Assignment(partitions=[kafka_upgrade.Kafka_Upgrade_Test-version-updates-11, kafka_upgrade.Kafka_Upgrade_Test-version-updates-23], userDataSize=56) INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] Sent a version 11 subscription and got version 8 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance. INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] Sent a version 11 subscription and got version 8 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance. INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] Sent a version 11 subscription and got version 8 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance. INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] Sent a version 11 subscription and got version 8 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance. INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] Requested to schedule immediate rebalance due to version probing. INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] Requested to schedule immediate rebalance due to version probing. INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] Requested to schedule immediate rebalance due to version probing. INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] Requested to schedule immediate rebalance due to version probing. *Streams Config:* acceptable.recovery.lag = 1 application.id = Kafka_Upgrade_Test application.server = bootstrap.servers = [broker1, broker2, broker3] buffered.records.per.partition = 1000 built.in.metrics.version = latest cache.max.bytes.buffering = 10485760 client.id = kafka_upgrade.Kafka_Upgrade_Test commit.interval.ms = 3 connections.max.idle.ms = 54 default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler default.dsl.store = rocksDB default.key.serde = null default.list.key.serde.inner = null default.list.key.serde.type = null default.list.value.serde.inner = null default.list.value.serde.type = null default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp default.value.serde = null max.task.idle.ms = 0 max.warmup.replicas
Re: producer purgatory
thank you! On Mon, Feb 27, 2023 at 12:37 PM David Ballano Fernandez < dfernan...@demonware.net> wrote: > Hi guys, > > I am loadtesting a couple clusters one with local ssd disks and another > one with ceph. > > Both clusters have the same amount of cpu/ram and they are configured the > same way. > im sending the same amount of messages and producing with linger.ms=0 and > acks=all > > besides seeing higuer latencies on ceph for the most part, compared to > local disk. There is something that I don't understand. > > On the local disk cluster. messages per second matches exactly the > number of requests. > but on the ceph cluster messages do not match total produce requests per > second. > > and the only thing I can find is that the Producer purgatory in ceph kafka > cluster has more request queued up than the local disk. > > Also RemoteTime-ms for producers is high, which could explain why there > are more requests on the purgatory. > > To me , I think this means that the Producer is waiting to hear from all > the acks. which are set to all. But I don't understand why the local disk > Kafka cluster purgatory queue is way lower. > > since I don't think disk is used for this? could be network saturation > since ceph is network storage is interfering with the producer waiting > for acks? is there a way to tune the producer purgatory? I did change > num.replica.fetchers but that only lowered the fetch purgatory. > > > > >
producer purgatory
Hi guys, I am loadtesting a couple clusters one with local ssd disks and another one with ceph. Both clusters have the same amount of cpu/ram and they are configured the same way. im sending the same amount of messages and producing with linger.ms=0 and acks=all besides seeing higuer latencies on ceph for the most part, compared to local disk. There is something that I don't understand. On the local disk cluster. messages per second matches exactly the number of requests. but on the ceph cluster messages do not match total produce requests per second. and the only thing I can find is that the Producer purgatory in ceph kafka cluster has more request queued up than the local disk. Also RemoteTime-ms for producers is high, which could explain why there are more requests on the purgatory. To me , I think this means that the Producer is waiting to hear from all the acks. which are set to all. But I don't understand why the local disk Kafka cluster purgatory queue is way lower. since I don't think disk is used for this? could be network saturation since ceph is network storage is interfering with the producer waiting for acks? is there a way to tune the producer purgatory? I did change num.replica.fetchers but that only lowered the fetch purgatory.
Kafka Streams 2.7.1 to 3.3.1 rolling upgrade
Hi Team, I am trying to upgrade my Kaka Streams application from 2.7.1 to 3.3.1. Brokers are running on Kafka 2.7.1. The plan is to upgrade the clients first and then then brokers I have already enabled the static membership in our application so that we I am not expecting a rebalance. Below are the configs *(Stream Config & Consumer Config)*. As mentioned earlier, the application is running on Kafka 2.7.1. I deployed the latest version of the app with 3.3.1 streams libraries, and configured the '*upgrade.from' *property to 2.7 (based on the upgrade documentation available here https://kafka.apache.org/33/documentation/streams/upgrade-guide). When I do a rolling bounce with the latest changes, I can see a rebalance being triggered on other instances in the cluster. I can see the below logs on the instance which is being bounced, forcing a rebalance on others. Am I missing something? How can I avoid other instances in the cluster from rebalancing? *Logs:* INFO 2023-02-27 09:52:16.805 | streams.KafkaStreams stream-client [kafka_upgrade.Kafka_Upgrade_Test] State transition from CREATED to REBALANCING INFO 2023-02-27 09:52:16.946 | internals.ConsumerCoordinator [Consumer instanceId=kafka_upgrade.Kafka_Upgrade_Test-4, clientId=kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer, groupId=kafka_upgrade.Kafka_Upgrade_Test] Notifying assignor about the new Assignment(partitions=[kafka_upgrade.Kafka_Upgrade_Test-version-updates-11, kafka_upgrade.Kafka_Upgrade_Test-version-updates-23], userDataSize=56) INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] Sent a version 11 subscription and got version 8 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance. INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] Sent a version 11 subscription and got version 8 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance. INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] Sent a version 11 subscription and got version 8 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance. INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] Sent a version 11 subscription and got version 8 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance. INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] Requested to schedule immediate rebalance due to version probing. INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] Requested to schedule immediate rebalance due to version probing. INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] Requested to schedule immediate rebalance due to version probing. INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] Requested to schedule immediate rebalance due to version probing. *Streams Config:* acceptable.recovery.lag = 1 application.id = Kafka_Upgrade_Test application.server = bootstrap.servers = [broker1, broker2, broker3] buffered.records.per.partition = 1000 built.in.metrics.version = latest cache.max.bytes.buffering = 10485760 client.id = kafka_upgrade.Kafka_Upgrade_Test commit.interval.ms = 3 connections.max.idle.ms = 54 default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler default.dsl.store = rocksDB default.key.serde = null default.list.key.serde.inner = null default.list.key.serde.type = null default.list.value.serde.inner = null default.list.value.serde.type = null default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp default.value.serde = null max.task.idle.ms = 0 max.warmup.replicas = 2 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 num.standby.replicas = 0 num.stream.threads = 4 poll.ms = 100 probing.rebalance.interval.ms = 60 processing.guarantee = at_least_once rack.aware.assignment.tags = [] receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backo