Not a problem. Glad that you've not seen it anymore now. If it occurs again please feel free to reach out to the community again.
Guozhang On Thu, Jan 24, 2019 at 2:32 PM Niklas Lönn <niklas.l...@gmail.com> wrote: > Hi. > > I have something good (and personally mysterious) to report. > > We do indeed run 1.1.x in production. > > And today when I was almost finished cleaning up my test case for public > display, I had been forced by corp policies to update osx, and suddenly > when I had my test in a "non hacky improvised piece of iteration test code > not asserting stuff" mode, I couldn't recreate the issue any more, not with > the new or the old code. > > I suspect I was unlucky to hit some other issue in my os/firmware having > very similar symptoms as we had in production, ran my test on another > computer without this update and it was fine there as well. > > I guess that concludes that you are most likely very right with this 1.1 > bug and I was super unlucky to be able to recreate it locally due to other > issues. > > Thanks for the support and rubber ducking :) > > Kind regards > Niklas > > On Thu 24. Jan 2019 at 02:08, Guozhang Wang <wangg...@gmail.com> wrote: > > > I see (btw attachments are usually not allowed in AK mailing list, but if > > you have it somewhere like gitcode and can share the url that works). > > > > Could you let me know how many physical cores do you have in total > hosting > > your app and how many threads did you configure? From your current > > description there should have at least 40 tasks (20 reading from source > > topics and writing to repartition topics, and 20 reading from repartition > > topics), and I'd like to know how are these tasks be assigned to threads, > > and how many threads may be executed in parallel from the hardware. > > > > > > Guozhang > > > > > > On Wed, Jan 23, 2019 at 1:21 PM Niklas Lönn <niklas.l...@gmail.com> > wrote: > > > > > I have to double check what version of broker we run in production but > > when > > > testing and verifying the issue locally I did reproduce it with both > > broker > > > and client version 2.1.0 > > > > > > Kind regards > > > Niklas > > > > > > On Wed 23. Jan 2019 at 18:24, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > > > I see. > > > > > > > > What you described is a known issue in the older version of Kafka, > that > > > > some high traffic topics in the bootstrap mode may effectively > "starve" > > > > other topics in the fetch response, since brokers used to naively > fill > > in > > > > the bytes that meets the max.bytes configuration and returns. This is > > > fixed > > > > in 1.1 version via incremental fetch request: > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability > > > > > > > > The basic idea is to not always request topics like A,B,C; instead if > > the > > > > previous request asks for topics A,B,C and got all data from A, then > > next > > > > request would be B,C,A, etc. So if you are on older versions of Kafka > > I'd > > > > suggest you upgrade to newer version. > > > > > > > > If you cannot upgrade atm, another suggest as I mentioned above is to > > > > change the segment sizes so you can have much larger, and hence fewer > > > > segment files. > > > > > > > > Guozhang > > > > > > > > > > > > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn <niklas.l...@gmail.com> > > > wrote: > > > > > > > > > Hi Guozhang, > > > > > > > > > > I think I went a bit ahead of myself in describing the situation, I > > had > > > > an > > > > > attachment with the context in detail, maybe it was filtered out. > > Lets > > > > try > > > > > again =) > > > > > > > > > > We have a topology looking something like this: > > > > > > > > > > input-topic[20 partitions, compacted] > > > > > | > > > > > use-case-repartition[20 partitions, infinite retention, segment.ms > > > > =10min] > > > > > | > > > > > use-case-changelog > > > > > > > > > > We have previously hit the TooManyOpenFiles issue and "solved" it > by > > > > > raising the bar to something extreme. > > > > > Later we found out that we wanted rep factor 3 on all internal > > topics, > > > so > > > > > we reset the app and BOOM, now we hit a too many memory mapped > files > > > > limit > > > > > instead > > > > > > > > > > the input topic contains 30 days of data, where we pretty much have > > > > records > > > > > in every 10minute window for every partition. > > > > > This means if nothing consumes the repartition topic we will have 6 > > (10 > > > > min > > > > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log > > .timeindex > > > > > files) * 3 replication factor / 5 brokers in cluster = *155.520 > *open > > > > files > > > > > just to have this repartition topic in place. > > > > > > > > > > You would say, yeah but no problem as it would be deleted and you > > would > > > > not > > > > > reach such high numbers? But doesn't seem to be the case. > > > > > What happened in our case is that, due to how the broker > multiplexes > > > the > > > > > topic partitions for the subscribers, the streams application piled > > up > > > > all > > > > > the repartition records, and only when caught up, all the > downstream > > > > > processes started taking place. I do see this as a design flaw in > > some > > > > > component, probably the broker. It cant be the desired behaviour. > How > > > > many > > > > > open files do I need to be able to have open in a year of data when > > > > > resetting/reprocessing an application? > > > > > > > > > > By adding more threads than input topic partitions, I managed to > > force > > > > the > > > > > broker to give out these records earlier and issue was mitigated. > > > > > > > > > > Ideally the downstream records should be processed somewhere near > in > > > time > > > > > as the source record. > > > > > > > > > > Lets take one partition, containing 1.000.000 records this is the > > > > observed > > > > > behaviour I have seen: (Somewhat simplified) > > > > > > > > > > Time Consumer offset Input topic Records in input topic > > > > > Consumer offset repartition topic Records in repartition topic > > > > > 00:00 0 1.000.000 > > > > > 0 > > 0 > > > > > 00:01 100.000 1.000.000 > > > > > 0 > > > 100.000 > > > > > 00:02 200.000 1.000.000 > > > > > 0 > > > 200.000 > > > > > 00:03 300.000 1.000.000 > > > > > 0 > > > 300.000 > > > > > 00:04 400.000 1.000.000 > > > > > 0 > > > 400.000 > > > > > 00:05 500.000 1.000.000 > > > > > 0 > > > 500.000 > > > > > 00:06 600.000 1.000.000 > > > > > 0 > > > 600.000 > > > > > 00:07 700.000 1.000.000 > > > > > 0 > > > 700.000 > > > > > 00:08 800.000 1.000.000 > > > > > 0 > > > 800.000 > > > > > 00:09 900.000 1.000.000 > > > > > 0 > > > 900.000 > > > > > 00:10 1.000.000 1.000.000 > > > > > 0 > > > 1000.000 > > > > > 00:11 1.000.000 1.000.000 > > > > > 100.000 1000.000 > > > > > 00:12 1.000.000 1.000.000 > > > > > 200.000 1000.000 > > > > > 00:13 1.000.000 1.000.000 > > > > > 300.000 1000.000 > > > > > 00:14 1.000.000 1.000.000 > > > > > 400.000 1000.000 > > > > > 00:15 1.000.000 1.000.000 > > > > > 500.000 1000.000 > > > > > 00:16 1.000.000 1.000.000 > > > > > 600.000 1000.000 > > > > > 00:17 1.000.000 1.000.000 > > > > > 700.000 1000.000 > > > > > 00:18 1.000.000 1.000.000 > > > > > 800.000 1000.000 > > > > > 00:19 1.000.000 1.000.000 > > > > > 900.000 1000.000 > > > > > 00:20 1.000.000 1.000.000 > > > > > 1.000.000 1000.000 > > > > > > > > > > As you can see, there is no parallel execution and its due to that > > the > > > > > broker does not give any records from repartition topic until input > > > topic > > > > > is depleted. > > > > > By adding more threads than input partitions I managed to mitigate > > this > > > > > behaviour somewhat, but still not close to balanced. > > > > > > > > > > Ideally in such a situation where we rebuild stream states, I would > > > more > > > > > expect a behaviour like this: > > > > > > > > > > Time Consumer offset Input topic Records in input topic > > > > > Consumer offset repartition topic Records in repartition topic > > > > > 00:00 0 1.000.000 > > > > > 0 > > 0 > > > > > 00:01 100.000 1.000.000 > > > > > 0 > > > 100.000 > > > > > 00:02 200.000 1.000.000 > > > > > 100.000 > 200.000 > > > > > 00:03 300.000 1.000.000 > > > > > 200.000 > 300.000 > > > > > 00:04 400.000 1.000.000 > > > > > 300.000 > 400.000 > > > > > 00:05 500.000 1.000.000 > > > > > 400.000 > 500.000 > > > > > 00:06 600.000 1.000.000 > > > > > 500.000 > 600.000 > > > > > 00:07 700.000 1.000.000 > > > > > 600.000 > 700.000 > > > > > 00:08 800.000 1.000.000 > > > > > 700.000 > 800.000 > > > > > 00:09 900.000 1.000.000 > > > > > 800.000 > 900.000 > > > > > 00:10 1.000.000 1.000.000 > > > > > 900.000 1000.000 > > > > > 00:10 1.000.000 1.000.000 > > > > > 1.000.000 1000.000 > > > > > > > > > > > > > > > Kind regards > > > > > Niklas > > > > > > > > > > On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > Hello Niklas, > > > > > > > > > > > > If you can monitor your repartition topic's consumer lag, and it > > was > > > > > > increasing consistently, it means your downstream processor > cannot > > > > simply > > > > > > keep up with the throughput of the upstream processor. Usually it > > > means > > > > > > your downstream operators is heavier (e.g. aggregations, joins > that > > > are > > > > > all > > > > > > stateful) than your upstreams (e.g. simply for shuffling the data > > to > > > > > > repartition topics), and since tasks assignment only consider a > > task > > > as > > > > > the > > > > > > smallest unit of work and did not differentiate "heavy" and > "light" > > > > > tasks, > > > > > > such imbalance of task assignment may happen. At the moment, to > > > resolve > > > > > > this you should add more resources to make sure the heavy tasks > get > > > > > enough > > > > > > computational resource assigned (more threads, e.g.). > > > > > > > > > > > > If your observed consumer lag stays plateau after increasing to > > some > > > > > point, > > > > > > it means your consumer can actually keep up with some constant > lag; > > > if > > > > > you > > > > > > hit your open file limits before seeing this, it means you either > > > need > > > > to > > > > > > increase your open file limits, OR you can simply increase the > > > segment > > > > > size > > > > > > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the > > value > > > > of > > > > > > TopicConfig.SEGMENT_BYTES_CONFIG. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn < > niklas.l...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > Hi Kafka Devs & Users, > > > > > > > > > > > > > > We recently had an issue where we processed a lot of old data > and > > > we > > > > > > > crashed our brokers due to too many memory mapped files. > > > > > > > It seems to me that the nature of Kafka / Kafka Streams is a > bit > > > > > > > suboptimal in terms of resource management. (Keeping all files > > open > > > > all > > > > > > the > > > > > > > time, maybe there should be something managing this more > > > on-demand?) > > > > > > > > > > > > > > In the issue I described, the repartition topic was produced > very > > > > fast, > > > > > > > but not consumed, causing a lot of segments and files to be > open > > at > > > > the > > > > > > > same time. > > > > > > > > > > > > > > I have worked around the issue by making sure I have more > threads > > > > than > > > > > > > partitions to force tasks to subscribe to internal topics only, > > but > > > > > > seems a > > > > > > > bit hacky and maybe there should be some guidance in > > documentation > > > if > > > > > > > considered part of design.. > > > > > > > > > > > > > > After quite some testing and code reversing it seems that the > > > nature > > > > of > > > > > > > this imbalance lies within how the broker multiplexes the > > consumed > > > > > > > topic-partitions. > > > > > > > > > > > > > > I have attached a slide that I will present to my team to > explain > > > the > > > > > > > issue in a bit more detail, it might be good to check it out to > > > > > > understand > > > > > > > the context. > > > > > > > > > > > > > > Any thoughts about my findings and concerns? > > > > > > > > > > > > > > Kind regards > > > > > > > Niklas > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang