Hello,

We have a Kafka Streams consumer application running Kafka Streams 3.4,
with our Kafka brokers running 2.6. This consumer application consumes from
two topics with 250 partitions each, and we co-partition them to ensure
each task is consuming from the same partitions in each topic. Some other
settings:

num.stream.threads = 32 # fleet size is roughly 20 instances
num.standby.replicas = 1
probing.rebalance.interval.ms = 60,000
max.warmup.replicas = 8
session.timeout.ms = 5,000
heartbeat.interval.ms = 1,600
acceptable.recovery.lag = 100,000

We observe that during rolling code deployments where we deploy new code
slowly through the whole fleet, a few of the instances in our fleet are
liable to get most of the active tasks, then very slowly drain those tasks
to other instances. We instrumented the amount of active tasks per instance
in our fleet to get a better idea of the hotspotting. Here are some results:

[image: Screenshot 2023-10-09 at 6.50.19 AM.png]

   - A sequence of 3 deployments each displaying hotspots on the same 2
   instances

[image: Screenshot 2023-10-05 at 9.41.16 AM.png]

   - A detailed breakdown of the active tasks per instance (different
   instances than above image) showing the slow draining of active tasks to
   less loaded instances

One thing that we tried was setting acceptable.recovery.lag to 100,000
because we have fast moving topics and thought that warmup tasks were not
quickly transitioning to standby because of lag due to volume of data.
However this did not solve the problem. We also added an artificial delay
of 5 minutes between deployments of each successive instance, hoping that
this would provide more time for rebalancing to occur during the
deployment, but that did not materially change the behavior either. One
behavior we observed is that typically the two overloaded instances are
also whose client ids appear first when iterating through a TreeMap (which
the HighAvailabilityTaskAssignor
<https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java#L58>
does).
Another observation is that the StreamPartitionAssignor
<https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java#L1121>
biases
towards picking threads 1, and 10-19 over threads 2-9, 20-32, due to the
natural ordering in the SortedSet. However, these observations haven't
really helped us find ways to resolve the problem. We were also wondering
if because the number of total threads in our fleet is larger than the
expected amount of tasks (640 total threads > 250 active tasks + 250
standby tasks), that causes the imbalance, however lowering the total
threads in our fleet to 200 did not change the behavior. Are there any
other configurations we should be looking at?

Thank you,
Sabit

Reply via email to