I only have one operator because of the operator chaining.  And it does not 
have any backpressure. There is no option for disabling the chaining.

And this same setup used to work with 1.9. So I am wondering what has changed 
in 1.10 runner.

________________________________
From: Maximilian Michels <m...@apache.org>
Sent: Friday, September 18, 2020 7:08 AM
To: user@beam.apache.org <user@beam.apache.org>; Deshpande, Omkar 
<omkar_deshpa...@intuit.com>
Subject: Re: flink runner 1.10 checkpoint timeout issue

This email is from an external sender.


This type of stack trace occurs when the downstream operator is blocked
for some reason. Flink maintains a finite number of network buffers for
each network channel. If the receiving downstream operator does not
process incoming network buffers, the upstream operator blocks. This is
also called backpressure and a useful feature to avoid data congestion.

I would check the stack traces downstream to find the cause of the
backpressure.

-Max

On 17.09.20 19:50, Deshpande, Omkar wrote:
> Flink 1.10
> ------------------------------------------------------------------------
> *From:* Kyle Weaver <kcwea...@google.com>
> *Sent:* Thursday, September 17, 2020 9:34 AM
> *To:* user@beam.apache.org <user@beam.apache.org>
> *Subject:* Re: flink runner 1.10 checkpoint timeout issue
> This email is from an external sender.
>
> What is the version of your Flink cluster?
>
> On Wed, Sep 16, 2020 at 9:10 PM Deshpande, Omkar
> <omkar_deshpa...@intuit.com <mailto:omkar_deshpa...@intuit.com>> wrote:
>
>     Hello,
>
>     I recently upgraded to beam-flink-runner-1.10:2.23.0 from
>     beam-flink-runner-1.9:2.23.0. My application was working as expected
>     with 1.9 runner. but after upgrading the checkpoints are timing out.
>     Even after increasing the timeout significantly, the checkpoints
>     keep failing. I was trying to look at the stack dump to determine
>     any deadlocks. There are no deadlocks. But this thread seems to be
>     in awaiting confirmation stage for long time -
>
>     Legacy Source Thread - Source:
>     read/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
>     read/Remove Kafka Metadata/ParMultiDo(Anonymous) -> Random key
>     assignment SPP/ParMultiDo(RandomPartitioner) -> Window for
>     repartitioning SPP/Window.Assign.out -> ToKeyedWorkItem (1/4)
>     <https://jstack.review/#tda_15_threaddetails_0x00007feed3601800>
>     awaiting notification on [ 0x00000007b83b7958
>     <https://jstack.review/#tda_15_sync_0x00000007b83b7958> ] , holding [
>
>       * 0x00000007bc786fd8
>         <https://jstack.review/#tda_15_sync_0x00000007bc786fd8>
>
>     ]
>     at sun.misc.Unsafe.park(Native Method)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at
>     
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>     at
>     java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at
>     
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>     at
>     java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>     at
>     
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
>
>
>     My application is IO bound, i.e every record makes a rest call and
>     takes a few seconds to complete.
>     Did not face this issue with 1.9 runner. What has changed in 1.10
>     runner ? Any pointers for debugging?
>
>     Omkar
>

Reply via email to