I have to double check what version of broker we run in production but when
testing and verifying the issue locally I did reproduce it with both broker
and client version 2.1.0

Kind regards
Niklas

On Wed 23. Jan 2019 at 18:24, Guozhang Wang <wangg...@gmail.com> wrote:

> I see.
>
> What you described is a known issue in the older version of Kafka, that
> some high traffic topics in the bootstrap mode may effectively "starve"
> other topics in the fetch response, since brokers used to naively fill in
> the bytes that meets the max.bytes configuration and returns. This is fixed
> in 1.1 version via incremental fetch request:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
>
> The basic idea is to not always request topics like A,B,C; instead if the
> previous request asks for topics A,B,C and got all data from A, then next
> request would be B,C,A, etc. So if you are on older versions of Kafka I'd
> suggest you upgrade to newer version.
>
> If you cannot upgrade atm, another suggest as I mentioned above is to
> change the segment sizes so you can have much larger, and hence fewer
> segment files.
>
> Guozhang
>
>
> On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn <niklas.l...@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > I think I went a bit ahead of myself in describing the situation, I had
> an
> > attachment with the context in detail, maybe it was filtered out. Lets
> try
> > again =)
> >
> > We have a topology looking something like this:
> >
> > input-topic[20 partitions, compacted]
> >     |
> > use-case-repartition[20 partitions, infinite retention, segment.ms
> =10min]
> >     |
> > use-case-changelog
> >
> > We have previously hit the TooManyOpenFiles issue and "solved" it by
> > raising the bar to something extreme.
> > Later we found out that we wanted rep factor 3 on all internal topics, so
> > we reset the app and BOOM, now we hit a too many memory mapped files
> limit
> > instead
> >
> > the input topic contains 30 days of data, where we pretty much have
> records
> > in every 10minute window for every partition.
> > This means if nothing consumes the repartition topic we will have 6 (10
> min
> > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
> > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open
> files
> > just to have this repartition topic in place.
> >
> > You would say, yeah but no problem as it would be deleted and you would
> not
> > reach such high numbers? But doesn't seem to be the case.
> > What happened in our case is that, due to how the broker multiplexes the
> > topic partitions for the subscribers, the streams application piled up
> all
> > the repartition records, and only when caught up, all the downstream
> > processes started taking place. I do see this as a design flaw in some
> > component, probably the broker. It cant be the desired behaviour. How
> many
> > open files do I need to be able to have open in a year of data when
> > resetting/reprocessing an application?
> >
> > By adding more threads than input topic partitions, I managed to force
> the
> > broker to give out these records earlier and issue was mitigated.
> >
> > Ideally the downstream records should be processed somewhere near in time
> > as the source record.
> >
> > Lets take one partition, containing 1.000.000 records this is the
> observed
> > behaviour I have seen: (Somewhat simplified)
> >
> > Time     Consumer offset Input topic     Records in input topic
> >  Consumer offset repartition topic     Records in repartition topic
> > 00:00    0                                               1.000.000
> >              0                                                        0
> > 00:01    100.000                                    1.000.000
> >          0                                                        100.000
> > 00:02    200.000                                    1.000.000
> >          0                                                        200.000
> > 00:03    300.000                                    1.000.000
> >          0                                                        300.000
> > 00:04    400.000                                    1.000.000
> >          0                                                        400.000
> > 00:05    500.000                                    1.000.000
> >          0                                                        500.000
> > 00:06    600.000                                    1.000.000
> >          0                                                        600.000
> > 00:07    700.000                                    1.000.000
> >          0                                                        700.000
> > 00:08    800.000                                    1.000.000
> >          0                                                        800.000
> > 00:09    900.000                                    1.000.000
> >          0                                                        900.000
> > 00:10    1.000.000                                 1.000.000
> >        0                                                        1000.000
> > 00:11    1.000.000                                 1.000.000
> >        100.000                                             1000.000
> > 00:12    1.000.000                                 1.000.000
> >        200.000                                             1000.000
> > 00:13    1.000.000                                 1.000.000
> >        300.000                                             1000.000
> > 00:14    1.000.000                                 1.000.000
> >        400.000                                             1000.000
> > 00:15    1.000.000                                 1.000.000
> >        500.000                                             1000.000
> > 00:16    1.000.000                                 1.000.000
> >        600.000                                             1000.000
> > 00:17    1.000.000                                 1.000.000
> >        700.000                                             1000.000
> > 00:18    1.000.000                                 1.000.000
> >        800.000                                             1000.000
> > 00:19    1.000.000                                 1.000.000
> >        900.000                                             1000.000
> > 00:20    1.000.000                                 1.000.000
> >        1.000.000                                          1000.000
> >
> > As you can see, there is no parallel execution and its due to that the
> > broker does not give any records from repartition topic until input topic
> > is depleted.
> > By adding more threads than input partitions I managed to mitigate this
> > behaviour somewhat, but still not close to balanced.
> >
> > Ideally in such a situation where we rebuild stream states, I would more
> > expect a behaviour like this:
> >
> > Time     Consumer offset Input topic     Records in input topic
> >  Consumer offset repartition topic     Records in repartition topic
> > 00:00    0                                               1.000.000
> >              0                                                        0
> > 00:01    100.000                                    1.000.000
> >          0                                                        100.000
> > 00:02    200.000                                    1.000.000
> >          100.000                                             200.000
> > 00:03    300.000                                    1.000.000
> >          200.000                                             300.000
> > 00:04    400.000                                    1.000.000
> >          300.000                                             400.000
> > 00:05    500.000                                    1.000.000
> >          400.000                                             500.000
> > 00:06    600.000                                    1.000.000
> >          500.000                                             600.000
> > 00:07    700.000                                    1.000.000
> >          600.000                                             700.000
> > 00:08    800.000                                    1.000.000
> >          700.000                                             800.000
> > 00:09    900.000                                    1.000.000
> >          800.000                                             900.000
> > 00:10    1.000.000                                 1.000.000
> >        900.000                                             1000.000
> > 00:10    1.000.000                                 1.000.000
> >        1.000.000                                          1000.000
> >
> >
> > Kind regards
> > Niklas
> >
> > On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Niklas,
> > >
> > > If you can monitor your repartition topic's consumer lag, and it was
> > > increasing consistently, it means your downstream processor cannot
> simply
> > > keep up with the throughput of the upstream processor. Usually it means
> > > your downstream operators is heavier (e.g. aggregations, joins that are
> > all
> > > stateful) than your upstreams (e.g. simply for shuffling the data to
> > > repartition topics), and since tasks assignment only consider a task as
> > the
> > > smallest unit of work and did not differentiate "heavy" and "light"
> > tasks,
> > > such imbalance of task assignment may happen. At the moment, to resolve
> > > this you should add more resources to make sure the heavy tasks get
> > enough
> > > computational resource assigned (more threads, e.g.).
> > >
> > > If your observed consumer lag stays plateau after increasing to some
> > point,
> > > it means your consumer can actually keep up with some constant lag; if
> > you
> > > hit your open file limits before seeing this, it means you either need
> to
> > > increase your open file limits, OR you can simply increase the segment
> > size
> > > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value
> of
> > > TopicConfig.SEGMENT_BYTES_CONFIG.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <niklas.l...@gmail.com>
> > wrote:
> > >
> > > > Hi Kafka Devs & Users,
> > > >
> > > > We recently had an issue where we processed a lot of old data and we
> > > > crashed our brokers due to too many memory mapped files.
> > > > It seems to me that the nature of Kafka / Kafka Streams is a bit
> > > > suboptimal in terms of resource management. (Keeping all files open
> all
> > > the
> > > > time, maybe there should be something managing this more on-demand?)
> > > >
> > > > In the issue I described, the repartition topic was produced very
> fast,
> > > > but not consumed, causing a lot of segments and files to be open at
> the
> > > > same time.
> > > >
> > > > I have worked around the issue by making sure I have more threads
> than
> > > > partitions to force tasks to subscribe to internal topics only, but
> > > seems a
> > > > bit hacky and maybe there should be some guidance in documentation if
> > > > considered part of design..
> > > >
> > > > After quite some testing and code reversing it seems that the nature
> of
> > > > this imbalance lies within how the broker multiplexes the consumed
> > > > topic-partitions.
> > > >
> > > > I have attached a slide that I will present to my team to explain the
> > > > issue in a bit more detail, it might be good to check it out to
> > > understand
> > > > the context.
> > > >
> > > > Any thoughts about my findings and concerns?
> > > >
> > > > Kind regards
> > > > Niklas
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to