I see (btw attachments are usually not allowed in AK mailing list, but if
you have it somewhere like gitcode and can share the url that works).

Could you let me know how many physical cores do you have in total hosting
your app and how many threads did you configure? From your current
description there should have at least 40 tasks (20 reading from source
topics and writing to repartition topics, and 20 reading from repartition
topics), and I'd like to know how are these tasks be assigned to threads,
and how many threads may be executed in parallel from the hardware.


Guozhang


On Wed, Jan 23, 2019 at 1:21 PM Niklas Lönn <niklas.l...@gmail.com> wrote:

> I have to double check what version of broker we run in production but when
> testing and verifying the issue locally I did reproduce it with both broker
> and client version 2.1.0
>
> Kind regards
> Niklas
>
> On Wed 23. Jan 2019 at 18:24, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > I see.
> >
> > What you described is a known issue in the older version of Kafka, that
> > some high traffic topics in the bootstrap mode may effectively "starve"
> > other topics in the fetch response, since brokers used to naively fill in
> > the bytes that meets the max.bytes configuration and returns. This is
> fixed
> > in 1.1 version via incremental fetch request:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> >
> > The basic idea is to not always request topics like A,B,C; instead if the
> > previous request asks for topics A,B,C and got all data from A, then next
> > request would be B,C,A, etc. So if you are on older versions of Kafka I'd
> > suggest you upgrade to newer version.
> >
> > If you cannot upgrade atm, another suggest as I mentioned above is to
> > change the segment sizes so you can have much larger, and hence fewer
> > segment files.
> >
> > Guozhang
> >
> >
> > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn <niklas.l...@gmail.com>
> wrote:
> >
> > > Hi Guozhang,
> > >
> > > I think I went a bit ahead of myself in describing the situation, I had
> > an
> > > attachment with the context in detail, maybe it was filtered out. Lets
> > try
> > > again =)
> > >
> > > We have a topology looking something like this:
> > >
> > > input-topic[20 partitions, compacted]
> > >     |
> > > use-case-repartition[20 partitions, infinite retention, segment.ms
> > =10min]
> > >     |
> > > use-case-changelog
> > >
> > > We have previously hit the TooManyOpenFiles issue and "solved" it by
> > > raising the bar to something extreme.
> > > Later we found out that we wanted rep factor 3 on all internal topics,
> so
> > > we reset the app and BOOM, now we hit a too many memory mapped files
> > limit
> > > instead
> > >
> > > the input topic contains 30 days of data, where we pretty much have
> > records
> > > in every 10minute window for every partition.
> > > This means if nothing consumes the repartition topic we will have 6 (10
> > min
> > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
> > > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open
> > files
> > > just to have this repartition topic in place.
> > >
> > > You would say, yeah but no problem as it would be deleted and you would
> > not
> > > reach such high numbers? But doesn't seem to be the case.
> > > What happened in our case is that, due to how the broker multiplexes
> the
> > > topic partitions for the subscribers, the streams application piled up
> > all
> > > the repartition records, and only when caught up, all the downstream
> > > processes started taking place. I do see this as a design flaw in some
> > > component, probably the broker. It cant be the desired behaviour. How
> > many
> > > open files do I need to be able to have open in a year of data when
> > > resetting/reprocessing an application?
> > >
> > > By adding more threads than input topic partitions, I managed to force
> > the
> > > broker to give out these records earlier and issue was mitigated.
> > >
> > > Ideally the downstream records should be processed somewhere near in
> time
> > > as the source record.
> > >
> > > Lets take one partition, containing 1.000.000 records this is the
> > observed
> > > behaviour I have seen: (Somewhat simplified)
> > >
> > > Time     Consumer offset Input topic     Records in input topic
> > >  Consumer offset repartition topic     Records in repartition topic
> > > 00:00    0                                               1.000.000
> > >              0                                                        0
> > > 00:01    100.000                                    1.000.000
> > >          0
> 100.000
> > > 00:02    200.000                                    1.000.000
> > >          0
> 200.000
> > > 00:03    300.000                                    1.000.000
> > >          0
> 300.000
> > > 00:04    400.000                                    1.000.000
> > >          0
> 400.000
> > > 00:05    500.000                                    1.000.000
> > >          0
> 500.000
> > > 00:06    600.000                                    1.000.000
> > >          0
> 600.000
> > > 00:07    700.000                                    1.000.000
> > >          0
> 700.000
> > > 00:08    800.000                                    1.000.000
> > >          0
> 800.000
> > > 00:09    900.000                                    1.000.000
> > >          0
> 900.000
> > > 00:10    1.000.000                                 1.000.000
> > >        0
> 1000.000
> > > 00:11    1.000.000                                 1.000.000
> > >        100.000                                             1000.000
> > > 00:12    1.000.000                                 1.000.000
> > >        200.000                                             1000.000
> > > 00:13    1.000.000                                 1.000.000
> > >        300.000                                             1000.000
> > > 00:14    1.000.000                                 1.000.000
> > >        400.000                                             1000.000
> > > 00:15    1.000.000                                 1.000.000
> > >        500.000                                             1000.000
> > > 00:16    1.000.000                                 1.000.000
> > >        600.000                                             1000.000
> > > 00:17    1.000.000                                 1.000.000
> > >        700.000                                             1000.000
> > > 00:18    1.000.000                                 1.000.000
> > >        800.000                                             1000.000
> > > 00:19    1.000.000                                 1.000.000
> > >        900.000                                             1000.000
> > > 00:20    1.000.000                                 1.000.000
> > >        1.000.000                                          1000.000
> > >
> > > As you can see, there is no parallel execution and its due to that the
> > > broker does not give any records from repartition topic until input
> topic
> > > is depleted.
> > > By adding more threads than input partitions I managed to mitigate this
> > > behaviour somewhat, but still not close to balanced.
> > >
> > > Ideally in such a situation where we rebuild stream states, I would
> more
> > > expect a behaviour like this:
> > >
> > > Time     Consumer offset Input topic     Records in input topic
> > >  Consumer offset repartition topic     Records in repartition topic
> > > 00:00    0                                               1.000.000
> > >              0                                                        0
> > > 00:01    100.000                                    1.000.000
> > >          0
> 100.000
> > > 00:02    200.000                                    1.000.000
> > >          100.000                                             200.000
> > > 00:03    300.000                                    1.000.000
> > >          200.000                                             300.000
> > > 00:04    400.000                                    1.000.000
> > >          300.000                                             400.000
> > > 00:05    500.000                                    1.000.000
> > >          400.000                                             500.000
> > > 00:06    600.000                                    1.000.000
> > >          500.000                                             600.000
> > > 00:07    700.000                                    1.000.000
> > >          600.000                                             700.000
> > > 00:08    800.000                                    1.000.000
> > >          700.000                                             800.000
> > > 00:09    900.000                                    1.000.000
> > >          800.000                                             900.000
> > > 00:10    1.000.000                                 1.000.000
> > >        900.000                                             1000.000
> > > 00:10    1.000.000                                 1.000.000
> > >        1.000.000                                          1000.000
> > >
> > >
> > > Kind regards
> > > Niklas
> > >
> > > On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hello Niklas,
> > > >
> > > > If you can monitor your repartition topic's consumer lag, and it was
> > > > increasing consistently, it means your downstream processor cannot
> > simply
> > > > keep up with the throughput of the upstream processor. Usually it
> means
> > > > your downstream operators is heavier (e.g. aggregations, joins that
> are
> > > all
> > > > stateful) than your upstreams (e.g. simply for shuffling the data to
> > > > repartition topics), and since tasks assignment only consider a task
> as
> > > the
> > > > smallest unit of work and did not differentiate "heavy" and "light"
> > > tasks,
> > > > such imbalance of task assignment may happen. At the moment, to
> resolve
> > > > this you should add more resources to make sure the heavy tasks get
> > > enough
> > > > computational resource assigned (more threads, e.g.).
> > > >
> > > > If your observed consumer lag stays plateau after increasing to some
> > > point,
> > > > it means your consumer can actually keep up with some constant lag;
> if
> > > you
> > > > hit your open file limits before seeing this, it means you either
> need
> > to
> > > > increase your open file limits, OR you can simply increase the
> segment
> > > size
> > > > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value
> > of
> > > > TopicConfig.SEGMENT_BYTES_CONFIG.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <niklas.l...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Kafka Devs & Users,
> > > > >
> > > > > We recently had an issue where we processed a lot of old data and
> we
> > > > > crashed our brokers due to too many memory mapped files.
> > > > > It seems to me that the nature of Kafka / Kafka Streams is a bit
> > > > > suboptimal in terms of resource management. (Keeping all files open
> > all
> > > > the
> > > > > time, maybe there should be something managing this more
> on-demand?)
> > > > >
> > > > > In the issue I described, the repartition topic was produced very
> > fast,
> > > > > but not consumed, causing a lot of segments and files to be open at
> > the
> > > > > same time.
> > > > >
> > > > > I have worked around the issue by making sure I have more threads
> > than
> > > > > partitions to force tasks to subscribe to internal topics only, but
> > > > seems a
> > > > > bit hacky and maybe there should be some guidance in documentation
> if
> > > > > considered part of design..
> > > > >
> > > > > After quite some testing and code reversing it seems that the
> nature
> > of
> > > > > this imbalance lies within how the broker multiplexes the consumed
> > > > > topic-partitions.
> > > > >
> > > > > I have attached a slide that I will present to my team to explain
> the
> > > > > issue in a bit more detail, it might be good to check it out to
> > > > understand
> > > > > the context.
> > > > >
> > > > > Any thoughts about my findings and concerns?
> > > > >
> > > > > Kind regards
> > > > > Niklas
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to