Hi All, to me it looks like something deadlocked, maybe due to this OOM error from Kafka, preventing a Task from making any progress. To confirm Dongwan you could collecte stack traces while the job is in such a blocked state. Deadlocked Kafka could easily explain those symptoms and it would be visible as an extreme back pressure. Another thing to look at would be if the job is making any progress or not at all (via for example numRecordsIn/numRecordsOut metric [1]).
A couple of clarifications. > What I suspect is the capacity of the asynchronous operation because limiting the value can cause back-pressure once the capacity is exhausted [1]. > Although I could increase the value (...) If you want to decrease the impact of a backpressure, you should decrease the capacity. Not increase it. The more in-flight records in the system, the more records need to be processed/persisted in aligned/unaligned checkpoints. > As far as I can tell from looking at the code, the async operator is able to checkpoint even if the work-queue is exhausted. Yes and no. If work-queue is full, `AsyncWaitOperator` can be snapshoted, but it can not be blocked inside the `AsyncWaitOperator#processElement` method. For checkpoint to be executed, `AsyncWaitrOperator` must finish processing the current record and return execution to the task thread. If the work-queue is full, `AsyncWaitOperator` will block inside the `AsyncWaitOperator#addToWorkQueue` method until the work-queue will have capacity to accept this new element. If what I suspect is happening here is true, and the job is deadlocked via this Kafka issue, `AsyncWaitOperator` will be blocked indefinitely in this method. Best, Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-release-1.14/docs/ops/metrics/#io wt., 9 lis 2021 o 11:55 Fabian Paul <fabianp...@ververica.com> napisał(a): > Hi Dongwan, > > Can you maybe share more about the setup and how you use the AsyncFunction > with > the Kafka client? > > As David already pointed out it could be indeed a Kafka bug but it could > also > mean that your defined async function leaks direct memory by not freeing > some > resources. > > We can definitely improve the metrics for the AsyncFunction and expose the > current queue size as a followup. > > Best, > Fabian