Re: Kafka Streams 2.7.1 to 3.3.1 rolling upgrade

2023-02-27 Thread Matthias J. Sax

Hmmm... that's interesting...

It seems that Kafka Streams "version probing" does not play well static 
group membership...


Sounds like a "bug" to me -- well, more like a missing integration. Not 
sure right now, if/how we could fix it.


Can you file a ticket?

For now, I don't think you can do anything about it. Sorry. :(


-Matthias



On 2/27/23 6:50 AM, Vinoth Rengarajan wrote:

Hi Team,

I am trying to upgrade my Kaka Streams application from 2.7.1 to 3.3.1.
Brokers are running on Kafka 2.7.1. The plan is to upgrade the clients
first and then then brokers

I have already enabled the static membership in our application so that we
I am not expecting a rebalance. Below are the configs *(Stream Config &
Consumer Config)*.

As mentioned earlier, the application is running on Kafka 2.7.1. I deployed
the latest version of the app with 3.3.1 streams libraries, and configured
the '*upgrade.from' *property to 2.7 (based on the upgrade documentation
available here
https://kafka.apache.org/33/documentation/streams/upgrade-guide). When I
do a rolling bounce with the latest changes, I can see a rebalance being
triggered on other instances in the cluster.

I can see the below logs on the instance which is being bounced, forcing a
rebalance on others. Am I missing something? How can I avoid other
instances in the cluster from rebalancing?


*Logs:*
INFO  2023-02-27 09:52:16.805 | streams.KafkaStreams stream-client
[kafka_upgrade.Kafka_Upgrade_Test] State transition from CREATED to
REBALANCING
INFO  2023-02-27 09:52:16.946 | internals.ConsumerCoordinator [Consumer
instanceId=kafka_upgrade.Kafka_Upgrade_Test-4,
clientId=kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer,
groupId=kafka_upgrade.Kafka_Upgrade_Test] Notifying assignor about the new
Assignment(partitions=[kafka_upgrade.Kafka_Upgrade_Test-version-updates-11,
kafka_upgrade.Kafka_Upgrade_Test-version-updates-23], userDataSize=56)
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer]
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer]
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer]
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer]
Requested to schedule immediate rebalance due to version probing.

*Streams Config:*

acceptable.recovery.lag = 1
application.id = Kafka_Upgrade_Test
application.server =
bootstrap.servers = [broker1, broker2, broker3]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id = kafka_upgrade.Kafka_Upgrade_Test
commit.interval.ms = 3
connections.max.idle.ms = 54
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.dsl.store = rocksDB
default.key.serde = null
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = null
max.task.idle.ms = 0
max.warmup.replicas

Re: producer purgatory

2023-02-27 Thread David Ballano Fernandez
thank you!

On Mon, Feb 27, 2023 at 12:37 PM David Ballano Fernandez <
dfernan...@demonware.net> wrote:

> Hi guys,
>
> I am loadtesting a couple clusters one with local ssd disks and another
> one with ceph.
>
> Both clusters have the same amount of cpu/ram and they are configured the
> same way.
> im sending the same amount of messages and producing with linger.ms=0 and
> acks=all
>
> besides seeing higuer latencies on ceph for the most part, compared to
> local disk. There is something that I don't understand.
>
> On the local disk cluster. messages per second matches exactly the
> number of requests.
> but on the ceph cluster messages  do not match total produce requests per
> second.
>
> and the only thing I can find is that the Producer purgatory in ceph kafka
> cluster has more request queued up than the local disk.
>
> Also RemoteTime-ms for producers is high, which could explain why there
> are more requests on the purgatory.
>
> To me , I think this means that the Producer is waiting to hear from all
> the acks. which are set to all. But I don't understand why the local disk
> Kafka cluster purgatory queue is way lower.
>
> since I don't think disk is used for this? could be network saturation
> since ceph  is network storage is interfering with the  producer waiting
> for acks? is there a way to tune the producer purgatory? I did change
> num.replica.fetchers but that only lowered the fetch purgatory.
>
>
>
>
>


producer purgatory

2023-02-27 Thread David Ballano Fernandez
Hi guys,

I am loadtesting a couple clusters one with local ssd disks and another one
with ceph.

Both clusters have the same amount of cpu/ram and they are configured the
same way.
im sending the same amount of messages and producing with linger.ms=0 and
acks=all

besides seeing higuer latencies on ceph for the most part, compared to
local disk. There is something that I don't understand.

On the local disk cluster. messages per second matches exactly the
number of requests.
but on the ceph cluster messages  do not match total produce requests per
second.

and the only thing I can find is that the Producer purgatory in ceph kafka
cluster has more request queued up than the local disk.

Also RemoteTime-ms for producers is high, which could explain why there are
more requests on the purgatory.

To me , I think this means that the Producer is waiting to hear from all
the acks. which are set to all. But I don't understand why the local disk
Kafka cluster purgatory queue is way lower.

since I don't think disk is used for this? could be network saturation
since ceph  is network storage is interfering with the  producer waiting
for acks? is there a way to tune the producer purgatory? I did change
num.replica.fetchers but that only lowered the fetch purgatory.


Kafka Streams 2.7.1 to 3.3.1 rolling upgrade

2023-02-27 Thread Vinoth Rengarajan
Hi Team,

I am trying to upgrade my Kaka Streams application from 2.7.1 to 3.3.1.
Brokers are running on Kafka 2.7.1. The plan is to upgrade the clients
first and then then brokers

I have already enabled the static membership in our application so that we
I am not expecting a rebalance. Below are the configs *(Stream Config &
Consumer Config)*.

As mentioned earlier, the application is running on Kafka 2.7.1. I deployed
the latest version of the app with 3.3.1 streams libraries, and configured
the '*upgrade.from' *property to 2.7 (based on the upgrade documentation
available here
https://kafka.apache.org/33/documentation/streams/upgrade-guide). When I
do a rolling bounce with the latest changes, I can see a rebalance being
triggered on other instances in the cluster.

I can see the below logs on the instance which is being bounced, forcing a
rebalance on others. Am I missing something? How can I avoid other
instances in the cluster from rebalancing?


*Logs:*
INFO  2023-02-27 09:52:16.805 | streams.KafkaStreams stream-client
[kafka_upgrade.Kafka_Upgrade_Test] State transition from CREATED to
REBALANCING
INFO  2023-02-27 09:52:16.946 | internals.ConsumerCoordinator [Consumer
instanceId=kafka_upgrade.Kafka_Upgrade_Test-4,
clientId=kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer,
groupId=kafka_upgrade.Kafka_Upgrade_Test] Notifying assignor about the new
Assignment(partitions=[kafka_upgrade.Kafka_Upgrade_Test-version-updates-11,
kafka_upgrade.Kafka_Upgrade_Test-version-updates-23], userDataSize=56)
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer]
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer]
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer]
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer]
Requested to schedule immediate rebalance due to version probing.

*Streams Config:*

acceptable.recovery.lag = 1
application.id = Kafka_Upgrade_Test
application.server =
bootstrap.servers = [broker1, broker2, broker3]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id = kafka_upgrade.Kafka_Upgrade_Test
commit.interval.ms = 3
connections.max.idle.ms = 54
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.dsl.store = rocksDB
default.key.serde = null
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = null
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
num.standby.replicas = 0
num.stream.threads = 4
poll.ms = 100
probing.rebalance.interval.ms = 60
processing.guarantee = at_least_once
rack.aware.assignment.tags = []
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backo