Hi All,

to me it looks like something deadlocked, maybe due to this OOM error from
Kafka, preventing a Task from making any progress. To confirm Dongwan you
could collecte stack traces while the job is in such a blocked state.
Deadlocked Kafka could easily explain those symptoms and it would be
visible as an extreme back pressure. Another thing to look at would be if
the job is making any progress or not at all (via for example
numRecordsIn/numRecordsOut metric [1]).

A couple of clarifications.

> What I suspect is the capacity of the asynchronous operation because
limiting the value can cause back-pressure once the capacity is exhausted
[1].
> Although I could increase the value (...)

If you want to decrease the impact of a backpressure, you should decrease
the capacity. Not increase it. The more in-flight records in the system,
the more records need to be processed/persisted in aligned/unaligned
checkpoints.

> As far as I can tell from looking at the code, the async operator is able
to checkpoint even if the work-queue is exhausted.

Yes and no. If work-queue is full, `AsyncWaitOperator` can be snapshoted,
but it can not be blocked inside the `AsyncWaitOperator#processElement`
method. For checkpoint to be executed, `AsyncWaitrOperator` must finish
processing the current record and return execution to the task thread. If
the work-queue is full, `AsyncWaitOperator` will block inside the
`AsyncWaitOperator#addToWorkQueue` method until the work-queue will have
capacity to accept this new element. If what I suspect is happening here is
true, and the job is deadlocked via this Kafka issue, `AsyncWaitOperator`
will be blocked indefinitely in this method.

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.14/docs/ops/metrics/#io



wt., 9 lis 2021 o 11:55 Fabian Paul <fabianp...@ververica.com> napisał(a):

> Hi Dongwan,
>
> Can you maybe share more about the setup and how you use the AsyncFunction
> with
> the Kafka client?
>
> As David already pointed out it could be indeed a Kafka bug but it could
> also
> mean that your defined async function leaks direct memory by not freeing
> some
> resources.
>
> We can definitely improve the metrics for the AsyncFunction and expose the
> current queue size as a followup.
>
> Best,
> Fabian

Reply via email to