Hi Guozhang,

I think I went a bit ahead of myself in describing the situation, I had an
attachment with the context in detail, maybe it was filtered out. Lets try
again =)

We have a topology looking something like this:

input-topic[20 partitions, compacted]
    |
use-case-repartition[20 partitions, infinite retention, segment.ms=10min]
    |
use-case-changelog

We have previously hit the TooManyOpenFiles issue and "solved" it by
raising the bar to something extreme.
Later we found out that we wanted rep factor 3 on all internal topics, so
we reset the app and BOOM, now we hit a too many memory mapped files limit
instead

the input topic contains 30 days of data, where we pretty much have records
in every 10minute window for every partition.
This means if nothing consumes the repartition topic we will have 6 (10 min
slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
files) * 3 replication factor / 5 brokers in cluster = *155.520 *open files
just to have this repartition topic in place.

You would say, yeah but no problem as it would be deleted and you would not
reach such high numbers? But doesn't seem to be the case.
What happened in our case is that, due to how the broker multiplexes the
topic partitions for the subscribers, the streams application piled up all
the repartition records, and only when caught up, all the downstream
processes started taking place. I do see this as a design flaw in some
component, probably the broker. It cant be the desired behaviour. How many
open files do I need to be able to have open in a year of data when
resetting/reprocessing an application?

By adding more threads than input topic partitions, I managed to force the
broker to give out these records earlier and issue was mitigated.

Ideally the downstream records should be processed somewhere near in time
as the source record.

Lets take one partition, containing 1.000.000 records this is the observed
behaviour I have seen: (Somewhat simplified)

Time     Consumer offset Input topic     Records in input topic
 Consumer offset repartition topic     Records in repartition topic
00:00    0                                               1.000.000
             0                                                        0
00:01    100.000                                    1.000.000
         0                                                        100.000
00:02    200.000                                    1.000.000
         0                                                        200.000
00:03    300.000                                    1.000.000
         0                                                        300.000
00:04    400.000                                    1.000.000
         0                                                        400.000
00:05    500.000                                    1.000.000
         0                                                        500.000
00:06    600.000                                    1.000.000
         0                                                        600.000
00:07    700.000                                    1.000.000
         0                                                        700.000
00:08    800.000                                    1.000.000
         0                                                        800.000
00:09    900.000                                    1.000.000
         0                                                        900.000
00:10    1.000.000                                 1.000.000
       0                                                        1000.000
00:11    1.000.000                                 1.000.000
       100.000                                             1000.000
00:12    1.000.000                                 1.000.000
       200.000                                             1000.000
00:13    1.000.000                                 1.000.000
       300.000                                             1000.000
00:14    1.000.000                                 1.000.000
       400.000                                             1000.000
00:15    1.000.000                                 1.000.000
       500.000                                             1000.000
00:16    1.000.000                                 1.000.000
       600.000                                             1000.000
00:17    1.000.000                                 1.000.000
       700.000                                             1000.000
00:18    1.000.000                                 1.000.000
       800.000                                             1000.000
00:19    1.000.000                                 1.000.000
       900.000                                             1000.000
00:20    1.000.000                                 1.000.000
       1.000.000                                          1000.000

As you can see, there is no parallel execution and its due to that the
broker does not give any records from repartition topic until input topic
is depleted.
By adding more threads than input partitions I managed to mitigate this
behaviour somewhat, but still not close to balanced.

Ideally in such a situation where we rebuild stream states, I would more
expect a behaviour like this:

Time     Consumer offset Input topic     Records in input topic
 Consumer offset repartition topic     Records in repartition topic
00:00    0                                               1.000.000
             0                                                        0
00:01    100.000                                    1.000.000
         0                                                        100.000
00:02    200.000                                    1.000.000
         100.000                                             200.000
00:03    300.000                                    1.000.000
         200.000                                             300.000
00:04    400.000                                    1.000.000
         300.000                                             400.000
00:05    500.000                                    1.000.000
         400.000                                             500.000
00:06    600.000                                    1.000.000
         500.000                                             600.000
00:07    700.000                                    1.000.000
         600.000                                             700.000
00:08    800.000                                    1.000.000
         700.000                                             800.000
00:09    900.000                                    1.000.000
         800.000                                             900.000
00:10    1.000.000                                 1.000.000
       900.000                                             1000.000
00:10    1.000.000                                 1.000.000
       1.000.000                                          1000.000


Kind regards
Niklas

On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Niklas,
>
> If you can monitor your repartition topic's consumer lag, and it was
> increasing consistently, it means your downstream processor cannot simply
> keep up with the throughput of the upstream processor. Usually it means
> your downstream operators is heavier (e.g. aggregations, joins that are all
> stateful) than your upstreams (e.g. simply for shuffling the data to
> repartition topics), and since tasks assignment only consider a task as the
> smallest unit of work and did not differentiate "heavy" and "light" tasks,
> such imbalance of task assignment may happen. At the moment, to resolve
> this you should add more resources to make sure the heavy tasks get enough
> computational resource assigned (more threads, e.g.).
>
> If your observed consumer lag stays plateau after increasing to some point,
> it means your consumer can actually keep up with some constant lag; if you
> hit your open file limits before seeing this, it means you either need to
> increase your open file limits, OR you can simply increase the segment size
> to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value of
> TopicConfig.SEGMENT_BYTES_CONFIG.
>
>
> Guozhang
>
>
> On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <niklas.l...@gmail.com> wrote:
>
> > Hi Kafka Devs & Users,
> >
> > We recently had an issue where we processed a lot of old data and we
> > crashed our brokers due to too many memory mapped files.
> > It seems to me that the nature of Kafka / Kafka Streams is a bit
> > suboptimal in terms of resource management. (Keeping all files open all
> the
> > time, maybe there should be something managing this more on-demand?)
> >
> > In the issue I described, the repartition topic was produced very fast,
> > but not consumed, causing a lot of segments and files to be open at the
> > same time.
> >
> > I have worked around the issue by making sure I have more threads than
> > partitions to force tasks to subscribe to internal topics only, but
> seems a
> > bit hacky and maybe there should be some guidance in documentation if
> > considered part of design..
> >
> > After quite some testing and code reversing it seems that the nature of
> > this imbalance lies within how the broker multiplexes the consumed
> > topic-partitions.
> >
> > I have attached a slide that I will present to my team to explain the
> > issue in a bit more detail, it might be good to check it out to
> understand
> > the context.
> >
> > Any thoughts about my findings and concerns?
> >
> > Kind regards
> > Niklas
> >
>
>
> --
> -- Guozhang
>

Reply via email to