Hi Ashish, How are you performing the backfill operation? Some time window? Can you specify details?
I mean ,if it helps, you can check out https://www.ververica.com/blog/how-to-write-fast-flink-sql . Regards On Tue, Jun 25, 2024 at 4:30 PM Ashish Khatkar via user < user@flink.apache.org> wrote: > Hi Xuyang, > > The input records are balanced across subtasks, with debloating buffers > enabled, the records this subtask receives is less as compared to other > subtasks. > > If the differences among all subtasks are not significant, we might be >> encountering an IO bottleneck. In this case, we could try increasing the >> parallelism of this vertex, or, as Penny suggested, we could try to enhance >> the memory of tm. > > > I checked k8s metrics for the taskmanagers and I don't see any IO issues, > I can relook at it again at the time we started backfill. Regarding > parallelism, I don't think Flink sql allows separate parallelism for a > specific operator. I can try to increase the parallelism for the entire job > (which will probably be counter productive for sources as they have 20 > partitions). Regarding memory, I have already given 72G mem to each > taskmanager (slot, 1 tm. = 1 slot). We are currently migrating join use > cases from our in house system that we built on top of datastream API and > that system never required this much of resources and we never faced any > such issues for this job. > > On Tue, Jun 25, 2024 at 7:30 AM Xuyang <xyzhong...@163.com> wrote: > >> Hi, Ashish. >> >> Can you confirm whether, on the subtask label page of this sink >> materializer node, the input records for each subtask are approximately the >> same? >> >> If the input records for subtask number 5 are significantly larger >> compared to the others, it signifies a serious data skew, and it would be >> necessary to modify the SQL appropriately to resolve this skew. >> >> If the differences among all subtasks are not significant, we might be >> encountering an IO bottleneck. In this case, we could try increasing the >> parallelism of this vertex, or, as Penny suggested, we could try to enhance >> the memory of tm. >> >> >> -- >> Best! >> Xuyang >> >> >> 在 2024-06-24 21:28:58,"Penny Rastogi" <walls.fl...@gmail.com> 写道: >> >> Hi Ashish, >> Can you check a few things. >> 1. Is your source broker count also 20 for both topics? >> 2. You can try increasing the state operation memory and reduce the disk >> I/O. >> >> - >> - Increase the number of CU resources in a single slot. >> - Set optimization parameters: >> - taskmanager.memory.managed.fraction=x >> - state.backend.rocksdb.block.cache-size=x >> - state.backend.rocksdb.writebuffer.size=x >> - 3. If possible, try left window join for your streams >> - >> - Please, share what sink you are using. Also, the per-operator, >> source and sink throughput, if possible? >> >> >> On Mon, Jun 24, 2024 at 3:32 PM Ashish Khatkar via user < >> user@flink.apache.org> wrote: >> >>> Hi all, >>> >>> We are facing backpressure in the flink sql job from the sink and the >>> backpressure only comes from a single task. This causes the checkpoint to >>> fail despite enabling unaligned checkpoints and using debloating buffers. >>> We enabled flamegraph and the task spends most of the time doing rocksdb >>> get and put. The sql job does a left join over two streams with a >>> parallelism of 20. The total data the topics have is 540Gb for one topic >>> and roughly 60Gb in the second topic. We are running 20 taskmanagers with 1 >>> slot each with each taskmanager having 72G mem and 9 cpu. >>> Can you provide any help on how to go about fixing the pipeline? We are >>> using Flink 1.17.2. The issue is similar to this stackoverflow thread >>> <https://stackoverflow.com/questions/77762119/flink-sql-job-stops-with-backpressure-after-a-week-of-execution>, >>> instead of week it starts facing back pressure as soon as the lag comes >>> down to 4-5%. >>> >>> [image: image.png] >>> >>