Hi Jun,

Did you override AsyncFunction#timeout()?  If so, did you call
> resultFuture.complete()/completeExceptionally() in your override?  Not
> calling them can result in checkpoint timeout.

No, I've only called resultFuture.complete() in AsyncFunction.asyncInvoke()
and didn't know much about  AsyncFunction.timeout(). However, looking at
the default implementation of AsyncFunction.timeout(), I'd rather get the
timeout exception as my streaming pipeline is going to be fail-fast which
is what I prefer the most.

I think I understand what you are concerned about but, as I wrote in the
reply to David, the problem seems to be the kafka sink which I recently
added to the pipeline in order to write large records (~10MB) for the
debugging purpose.

Anyway thanks a lot for letting me know the possibility of overriding
AsyncFunction.timeout().

Best,

Dongwon

On Tue, Nov 9, 2021 at 5:53 PM Jun Qin <qinjunje...@gmail.com> wrote:

> Hi Dongwon
>
> Did you override AsyncFunction#timeout()?  If so, did you call
> resultFuture.complete()/completeExceptionally() in your override?  Not
> calling them can result in checkpoint timeout.
>
> Thanks
> Jun
>
>
> On Nov 9, 2021, at 7:37 AM, Dongwon Kim <eastcirc...@gmail.com> wrote:
>
> Hi David,
>
> There are currently no metrics for the async work-queue size (you should
>> be able to see the queue stats with debug logs enabled though [1]).
>
> Thanks for the input but scraping DEBUG messages into, for example,
> ElasticSearch for monitoring on Grafana is not possible in my current
> environment.
> I just defined two counters in RichAsyncFunction for tracking # sent
> requests and # finished/failed requests, respectively, and used the two
> counters to calculate the inflight requests from Prometheus.
>
> As far as I can tell from looking at the code, the async operator is able
>> to checkpoint even if the work-queue is exhausted.
>
> Oh, I didn't know that! As you pointed out and I'm going to explain below,
> the async operator might not be the source of the problem.
>
> I just hit the same situation and found that
> - # of inflight records are zero when the backpressure is getting high
> - A taskmanager complains the following error message around the time when
> the backpressure is getting high (all the others don't do):
>
>> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread
>>                   [] - Uncaught exception in thread
>> 'kafka-producer-network-thread | producer-8':
>> java.lang.OutOfMemoryError: Direct buffer memory
>>         at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
>>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
>> ~[?:?]
>>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
>>         at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?]
>>         at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?]
>>         at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?]
>>         at
>> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493) ~[?:?]
>>         at java.nio.channels.SocketChannel.write(SocketChannel.java:507)
>> ~[?:?]
>>         at
>> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>         at
>> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>         at
>> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>         at
>> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>         at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>         at
>> org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>         at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>         at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>         at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>         at java.lang.Thread.run(Thread.java:829) [?:?]
>>
>
> Can it be the reason why my pipeline is stalled and ends up with the
> checkout timeout? I guess all the upstream tasks might fail to send data to
> the failed kafka producer and records are stacking up in buffers, which
> could result in the back-pressure. If so, is there no mechanism in Flink to
> detect such an error and send it to the job manager for debugging purposes?
>
> Best,
>
> Dongwon
>
>
> On Mon, Nov 8, 2021 at 9:21 PM David Morávek <d...@apache.org> wrote:
>
>> Hi Dongwon,
>>
>> There are currently no metrics for the async work-queue size (you should
>> be able to see the queue stats with debug logs enabled though [1]). As far
>> as I can tell from looking at the code, the async operator is able to
>> checkpoint even if the work-queue is exhausted.
>>
>> Arvid can you please validate the above? (the checkpoints not being
>> blocked by the work queue part)
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109
>>
>> Best,
>> D.
>>
>> On Sun, Nov 7, 2021 at 10:41 AM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hi community,
>>>
>>> While using Flink's async i/o for interacting with an external system, I
>>> got the following exception:
>>>
>>> 2021-11-06 10:38:35,270 INFO  
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - 
>>> Triggering checkpoint 54 (type=CHECKPOINT) @ 1636162715262 for job 
>>> f168a44ea33198cd71783824d49f9554.
>>> 2021-11-06 10:38:47,031 INFO  
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
>>> checkpoint 54 for job f168a44ea33198cd71783824d49f9554 (11930992707 bytes, 
>>> checkpointDuration=11722 ms, finalizationTime=47 ms).
>>> 2021-11-06 10:58:35,270 INFO  
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - 
>>> Triggering checkpoint 55 (type=CHECKPOINT) @ 1636163915262 for job 
>>> f168a44ea33198cd71783824d49f9554.
>>> 2021-11-06 11:08:35,271 INFO  
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - 
>>> Checkpoint 55 of job f168a44ea33198cd71783824d49f9554 expired before 
>>> completing.
>>> 2021-11-06 11:08:35,287 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
>>>                [] - Trying to recover from a global failure.
>>>
>>>
>>> - FYI, I'm using 1.14.0 and enabled unaligned checkpointing and buffer
>>> debloating
>>> - the 55th ckpt failed to complete within 10 mins (which is the value of
>>> execution.checkpointing.timeout)
>>> - the below graph shows that backpressure skyrocketed around the time
>>> the 55th ckpt began
>>> <image.png>
>>>
>>> What I suspect is the capacity of the asynchronous operation because
>>> limiting the value can cause back-pressure once the capacity is exhausted
>>> [1].
>>>
>>> Although I could increase the value, I want to monitor the current
>>> in-flight async i/o requests like the above back-pressure graph on Grafana.
>>> [2] does not introduce any system metric specific to async i/o.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#system-metrics
>>>
>>>
>>>
>

Reply via email to