Hi Dongwon

Did you override AsyncFunction#timeout()?  If so, did you call 
resultFuture.complete()/completeExceptionally() in your override?  Not calling 
them can result in checkpoint timeout.

Thanks
Jun


> On Nov 9, 2021, at 7:37 AM, Dongwon Kim <eastcirc...@gmail.com> wrote:
> 
> Hi David,
> 
> There are currently no metrics for the async work-queue size (you should be 
> able to see the queue stats with debug logs enabled though [1]). 
> Thanks for the input but scraping DEBUG messages into, for example, 
> ElasticSearch for monitoring on Grafana is not possible in my current 
> environment.
> I just defined two counters in RichAsyncFunction for tracking # sent requests 
> and # finished/failed requests, respectively, and used the two counters to 
> calculate the inflight requests from Prometheus.
> 
> As far as I can tell from looking at the code, the async operator is able to 
> checkpoint even if the work-queue is exhausted.
> Oh, I didn't know that! As you pointed out and I'm going to explain below, 
> the async operator might not be the source of the problem.
> 
> I just hit the same situation and found that 
> - # of inflight records are zero when the backpressure is getting high
> - A taskmanager complains the following error message around the time when 
> the backpressure is getting high (all the others don't do):
> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread       
>              [] - Uncaught exception in thread 'kafka-producer-network-thread 
> | producer-8':
> java.lang.OutOfMemoryError: Direct buffer memory
>         at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
>         at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?]
>         at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?]
>         at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?]
>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493) 
> ~[?:?]
>         at java.nio.channels.SocketChannel.write(SocketChannel.java:507) 
> ~[?:?]
>         at 
> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152)
>  ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
>  ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:483) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>         at java.lang.Thread.run(Thread.java:829) [?:?]
> 
> Can it be the reason why my pipeline is stalled and ends up with the checkout 
> timeout? I guess all the upstream tasks might fail to send data to the failed 
> kafka producer and records are stacking up in buffers, which could result in 
> the back-pressure. If so, is there no mechanism in Flink to detect such an 
> error and send it to the job manager for debugging purposes?
> 
> Best,
> 
> Dongwon
> 
> 
> On Mon, Nov 8, 2021 at 9:21 PM David Morávek <d...@apache.org 
> <mailto:d...@apache.org>> wrote:
> Hi Dongwon,
> 
> There are currently no metrics for the async work-queue size (you should be 
> able to see the queue stats with debug logs enabled though [1]). As far as I 
> can tell from looking at the code, the async operator is able to checkpoint 
> even if the work-queue is exhausted.
> 
> Arvid can you please validate the above? (the checkpoints not being blocked 
> by the work queue part)
> 
> [1] 
> https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109
>  
> <https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109>
> 
> Best,
> D.
> 
> On Sun, Nov 7, 2021 at 10:41 AM Dongwon Kim <eastcirc...@gmail.com 
> <mailto:eastcirc...@gmail.com>> wrote:
> Hi community,
> 
> While using Flink's async i/o for interacting with an external system, I got 
> the following exception:
> 
> 2021-11-06 10:38:35,270 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 54 (type=CHECKPOINT) @ 1636162715262 for job 
> f168a44ea33198cd71783824d49f9554.
> 2021-11-06 10:38:47,031 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
> checkpoint 54 for job f168a44ea33198cd71783824d49f9554 (11930992707 bytes, 
> checkpointDuration=11722 ms, finalizationTime=47 ms).
> 2021-11-06 10:58:35,270 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 55 (type=CHECKPOINT) @ 1636163915262 for job 
> f168a44ea33198cd71783824d49f9554.
> 2021-11-06 11:08:35,271 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 
> 55 of job f168a44ea33198cd71783824d49f9554 expired before completing.
> 2021-11-06 11:08:35,287 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Trying to recover from a global failure.
> 
> - FYI, I'm using 1.14.0 and enabled unaligned checkpointing and buffer 
> debloating
> - the 55th ckpt failed to complete within 10 mins (which is the value of 
> execution.checkpointing.timeout)
> - the below graph shows that backpressure skyrocketed around the time the 
> 55th ckpt began
> <image.png>
> 
> What I suspect is the capacity of the asynchronous operation because limiting 
> the value can cause back-pressure once the capacity is exhausted [1].
> 
> Although I could increase the value, I want to monitor the current in-flight 
> async i/o requests like the above back-pressure graph on Grafana.
> [2] does not introduce any system metric specific to async i/o. 
> 
> Best,
> 
> Dongwon
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#system-metrics
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#system-metrics>
> 
> 

Reply via email to