Hi Guozhang, I think I went a bit ahead of myself in describing the situation, I had an attachment with the context in detail, maybe it was filtered out. Lets try again =)
We have a topology looking something like this: input-topic[20 partitions, compacted] | use-case-repartition[20 partitions, infinite retention, segment.ms=10min] | use-case-changelog We have previously hit the TooManyOpenFiles issue and "solved" it by raising the bar to something extreme. Later we found out that we wanted rep factor 3 on all internal topics, so we reset the app and BOOM, now we hit a too many memory mapped files limit instead the input topic contains 30 days of data, where we pretty much have records in every 10minute window for every partition. This means if nothing consumes the repartition topic we will have 6 (10 min slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex files) * 3 replication factor / 5 brokers in cluster = *155.520 *open files just to have this repartition topic in place. You would say, yeah but no problem as it would be deleted and you would not reach such high numbers? But doesn't seem to be the case. What happened in our case is that, due to how the broker multiplexes the topic partitions for the subscribers, the streams application piled up all the repartition records, and only when caught up, all the downstream processes started taking place. I do see this as a design flaw in some component, probably the broker. It cant be the desired behaviour. How many open files do I need to be able to have open in a year of data when resetting/reprocessing an application? By adding more threads than input topic partitions, I managed to force the broker to give out these records earlier and issue was mitigated. Ideally the downstream records should be processed somewhere near in time as the source record. Lets take one partition, containing 1.000.000 records this is the observed behaviour I have seen: (Somewhat simplified) Time Consumer offset Input topic Records in input topic Consumer offset repartition topic Records in repartition topic 00:00 0 1.000.000 0 0 00:01 100.000 1.000.000 0 100.000 00:02 200.000 1.000.000 0 200.000 00:03 300.000 1.000.000 0 300.000 00:04 400.000 1.000.000 0 400.000 00:05 500.000 1.000.000 0 500.000 00:06 600.000 1.000.000 0 600.000 00:07 700.000 1.000.000 0 700.000 00:08 800.000 1.000.000 0 800.000 00:09 900.000 1.000.000 0 900.000 00:10 1.000.000 1.000.000 0 1000.000 00:11 1.000.000 1.000.000 100.000 1000.000 00:12 1.000.000 1.000.000 200.000 1000.000 00:13 1.000.000 1.000.000 300.000 1000.000 00:14 1.000.000 1.000.000 400.000 1000.000 00:15 1.000.000 1.000.000 500.000 1000.000 00:16 1.000.000 1.000.000 600.000 1000.000 00:17 1.000.000 1.000.000 700.000 1000.000 00:18 1.000.000 1.000.000 800.000 1000.000 00:19 1.000.000 1.000.000 900.000 1000.000 00:20 1.000.000 1.000.000 1.000.000 1000.000 As you can see, there is no parallel execution and its due to that the broker does not give any records from repartition topic until input topic is depleted. By adding more threads than input partitions I managed to mitigate this behaviour somewhat, but still not close to balanced. Ideally in such a situation where we rebuild stream states, I would more expect a behaviour like this: Time Consumer offset Input topic Records in input topic Consumer offset repartition topic Records in repartition topic 00:00 0 1.000.000 0 0 00:01 100.000 1.000.000 0 100.000 00:02 200.000 1.000.000 100.000 200.000 00:03 300.000 1.000.000 200.000 300.000 00:04 400.000 1.000.000 300.000 400.000 00:05 500.000 1.000.000 400.000 500.000 00:06 600.000 1.000.000 500.000 600.000 00:07 700.000 1.000.000 600.000 700.000 00:08 800.000 1.000.000 700.000 800.000 00:09 900.000 1.000.000 800.000 900.000 00:10 1.000.000 1.000.000 900.000 1000.000 00:10 1.000.000 1.000.000 1.000.000 1000.000 Kind regards Niklas On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Niklas, > > If you can monitor your repartition topic's consumer lag, and it was > increasing consistently, it means your downstream processor cannot simply > keep up with the throughput of the upstream processor. Usually it means > your downstream operators is heavier (e.g. aggregations, joins that are all > stateful) than your upstreams (e.g. simply for shuffling the data to > repartition topics), and since tasks assignment only consider a task as the > smallest unit of work and did not differentiate "heavy" and "light" tasks, > such imbalance of task assignment may happen. At the moment, to resolve > this you should add more resources to make sure the heavy tasks get enough > computational resource assigned (more threads, e.g.). > > If your observed consumer lag stays plateau after increasing to some point, > it means your consumer can actually keep up with some constant lag; if you > hit your open file limits before seeing this, it means you either need to > increase your open file limits, OR you can simply increase the segment size > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value of > TopicConfig.SEGMENT_BYTES_CONFIG. > > > Guozhang > > > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <niklas.l...@gmail.com> wrote: > > > Hi Kafka Devs & Users, > > > > We recently had an issue where we processed a lot of old data and we > > crashed our brokers due to too many memory mapped files. > > It seems to me that the nature of Kafka / Kafka Streams is a bit > > suboptimal in terms of resource management. (Keeping all files open all > the > > time, maybe there should be something managing this more on-demand?) > > > > In the issue I described, the repartition topic was produced very fast, > > but not consumed, causing a lot of segments and files to be open at the > > same time. > > > > I have worked around the issue by making sure I have more threads than > > partitions to force tasks to subscribe to internal topics only, but > seems a > > bit hacky and maybe there should be some guidance in documentation if > > considered part of design.. > > > > After quite some testing and code reversing it seems that the nature of > > this imbalance lies within how the broker multiplexes the consumed > > topic-partitions. > > > > I have attached a slide that I will present to my team to explain the > > issue in a bit more detail, it might be good to check it out to > understand > > the context. > > > > Any thoughts about my findings and concerns? > > > > Kind regards > > Niklas > > > > > -- > -- Guozhang >