I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on 
each record, it inverted behavior - now Legacy Source thread waits for 
checkpointLock, while Source requesting memorySegment.

"Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on 
java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
    ...

"Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 
BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> 
Filter (6/6)#0" Id=199
    at 
com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
    -  blocked on java.lang.Object@2af646cc
    at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
    at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)

Thanks,
Alexey
________________________________
From: Roman Khachatryan <ro...@apache.org>
Sent: Monday, March 22, 2021 1:36 AM
To: ChangZhuo Chen (陳昌倬) <czc...@czchen.org>
Cc: Alexey Trenikhun <yen...@msn.com>; Flink User Mail List 
<user@flink.apache.org>
Subject: Re: Checkpoint fail due to timeout

Thanks for sharing the thread dump.

It shows that the source thread is indeed back-pressured
(checkpoint lock is held by a thread which is trying to emit but
unable to acquire any free buffers).

The lock is per task, so there can be several locks per TM.

@ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
the same issue (but I can't tell for sure without a full thread dump)


Regards,
Roman

On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <czc...@czchen.org> wrote:
>
> On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > Hi Roman,
> > I took thread dump:
> > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> > digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> >     at 
> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> >     -  blocked on java.lang.Object@5366a0e2
> >     at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >     at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> >     at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> >     at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> >     at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >
> > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" 
> > Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at sun.misc.Unsafe.park(Native Method)
> >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> >     at 
> > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> >     at 
> > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> >     at 
> > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> >     at 
> > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> >     at 
> > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> >     at 
> > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> >
> > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see 
> > multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing 
> > blocked on different Objects.
>
> Hi,
>
> This call stack is similar to our case as described in [0]. Maybe they
> are the same issue?
>
> [0] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Reply via email to