Hi Xuyang,

The input records are balanced across subtasks, with debloating buffers
enabled, the records this subtask receives is less as compared to other
subtasks.

If the differences among all subtasks are not significant, we might be
> encountering an IO bottleneck. In this case, we could try increasing the
> parallelism of this vertex, or, as Penny suggested, we could try to enhance
> the memory of tm.


I checked k8s metrics for the taskmanagers and I don't see any IO issues, I
can relook at it again at the time we started backfill. Regarding
parallelism, I don't think Flink sql allows separate parallelism for a
specific operator. I can try to increase the parallelism for the entire job
(which will probably be counter productive for sources as they have 20
partitions). Regarding memory, I have already given 72G mem to each
taskmanager (slot, 1 tm. = 1 slot). We are currently migrating join use
cases from our in house system that we built on top of datastream API and
that system never required this much of resources and we never faced any
such issues for this job.

On Tue, Jun 25, 2024 at 7:30 AM Xuyang <xyzhong...@163.com> wrote:

> Hi, Ashish.
>
> Can you confirm whether, on the subtask label page of this sink
> materializer node, the input records for each subtask are approximately the
> same?
>
> If the input records for subtask number 5 are significantly larger
> compared to the others, it signifies a serious data skew, and it would be
> necessary to modify the SQL appropriately to resolve this skew.
>
> If the differences among all subtasks are not significant, we might be
> encountering an IO bottleneck. In this case, we could try increasing the
> parallelism of this vertex, or, as Penny suggested, we could try to enhance
> the memory of tm.
>
>
> --
>     Best!
>     Xuyang
>
>
> 在 2024-06-24 21:28:58,"Penny Rastogi" <walls.fl...@gmail.com> 写道:
>
> Hi Ashish,
> Can you check a few things.
> 1. Is your source broker count also 20 for both topics?
> 2. You can try increasing the state operation memory and reduce the disk
> I/O.
>
>    -
>       - Increase the number of CU resources in a single slot.
>          - Set optimization parameters:
>             - taskmanager.memory.managed.fraction=x
>             - state.backend.rocksdb.block.cache-size=x
>             - state.backend.rocksdb.writebuffer.size=x
>          - 3. If possible, try left window join for your streams
>    -
>    - Please, share what sink you are using. Also, the per-operator,
>    source and sink throughput, if possible?
>
>
> On Mon, Jun 24, 2024 at 3:32 PM Ashish Khatkar via user <
> user@flink.apache.org> wrote:
>
>> Hi all,
>>
>> We are facing backpressure in the flink sql job from the sink and the
>> backpressure only comes from a single task. This causes the checkpoint to
>> fail despite enabling unaligned checkpoints and using debloating buffers.
>> We enabled flamegraph and the task spends most of the time doing rocksdb
>> get and put. The sql job does a left join over two streams with a
>> parallelism of 20. The total data the topics have is 540Gb for one topic
>> and roughly 60Gb in the second topic. We are running 20 taskmanagers with 1
>> slot each with each taskmanager having 72G mem and 9 cpu.
>> Can you provide any help on how to go about fixing the pipeline? We are
>> using Flink 1.17.2. The issue is similar to this stackoverflow thread
>> <https://stackoverflow.com/questions/77762119/flink-sql-job-stops-with-backpressure-after-a-week-of-execution>,
>> instead of week it starts facing back pressure as soon as the lag comes
>> down to 4-5%.
>>
>> [image: image.png]
>>
>

Reply via email to