I also expected improve of checkpointing at the cost of throughput, but in in 
reality I didn't notice difference neither in checkpointing or throughput. 
Backlog was purged by Kafka, so can't post thread dump right now, but I doubt 
that the problem is gone, so will have next chance during next performance run.

Thanks,
Alexey

________________________________
From: Roman Khachatryan <ro...@apache.org>
Sent: Tuesday, March 23, 2021 12:17 AM
To: Alexey Trenikhun <yen...@msn.com>
Cc: ChangZhuo Chen (陳昌倬) <czc...@czchen.org>; Flink User Mail List 
<user@flink.apache.org>
Subject: Re: Checkpoint fail due to timeout

Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27).

Theoretically, the change you've made should improve checkpointing at
the cost of throughput. Is it what you see?

But the new stack traces seem strange to me as the emission of the
checkpoint barrier doesn't require a buffer. I also don't see that the
source thread holds the checkpoint lock (something like "locked
<0x000000002af646cc> (a java.lang.Object)"). Could you post or attach
the full thread dump?



Regards,
Roman

On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun <yen...@msn.com> wrote:
>
> I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on 
> each record, it inverted behavior - now Legacy Source thread waits for 
> checkpointLock, while Source requesting memorySegment.
>
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on 
> java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at sun.misc.Unsafe.park(Native Method)
>     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
>     ...
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 
> BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> 
> Filter (6/6)#0" Id=199
>     at 
> com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
>     -  blocked on java.lang.Object@2af646cc
>     at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>     at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
>
> Thanks,
> Alexey
> ________________________________
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Monday, March 22, 2021 1:36 AM
> To: ChangZhuo Chen (陳昌倬) <czc...@czchen.org>
> Cc: Alexey Trenikhun <yen...@msn.com>; Flink User Mail List 
> <user@flink.apache.org>
> Subject: Re: Checkpoint fail due to timeout
>
> Thanks for sharing the thread dump.
>
> It shows that the source thread is indeed back-pressured
> (checkpoint lock is held by a thread which is trying to emit but
> unable to acquire any free buffers).
>
> The lock is per task, so there can be several locks per TM.
>
> @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
> the same issue (but I can't tell for sure without a full thread dump)
>
>
> Regards,
> Roman
>
> On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <czc...@czchen.org> 
> wrote:
> >
> > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > > Hi Roman,
> > > I took thread dump:
> > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> > > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> > > digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > >     at 
> > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > >     -  blocked on java.lang.Object@5366a0e2
> > >     at 
> > > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > >     at 
> > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > >     at 
> > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > >     at 
> > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> > >     at 
> > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> > >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> > >
> > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" 
> > > Id=202 WAITING on 
> > > java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     -  waiting on 
> > > java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > >     at 
> > > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> > >     at 
> > > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> > >     at 
> > > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> > >     at 
> > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > >     at 
> > > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> > >     at 
> > > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> > >
> > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see 
> > > multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing 
> > > blocked on different Objects.
> >
> > Hi,
> >
> > This call stack is similar to our case as described in [0]. Maybe they
> > are the same issue?
> >
> > [0] 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
> >
> >
> > --
> > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > http://czchen.info/
> > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Reply via email to