Hi,

If your async operations are stalled, this will eventually cause problems. 
Either this will back pressure sources (the async’s operator queue will become 
full) or you will run out of memory (if you configured the queue’s capacity too 
high). I think the only possible solution is to either drop records in some 
way, or to spill them to some storage for later processing (assuming that the 
storage will not overflow/will not cause stalls on it’s own).

Regarding the Kafka offsets, as you wrote, Flink’s KafkaConsumer is not using 
internal Kafka offsets for recovery - for this purpose Kafka offsets are stored 
inside Flink’s state.

Regarding the checkpointing you can read about how it’s being done in general 
in the docs [1]. Once barrier alignment for the async operator is done, it 
checkpoints its state. Part of this state are the queues of elements that are 
currently being processed asynchronously. So if failure happens, after recovery 
all of the operators (sources, async operator, sinks, …) are restored 
effectively to the same logical point of time. In case of async operator, async 
operations that were caught in the middle of processing when checkpoint 
barriers arrived are resubmitted/retried.

I hope that answers yours questions :)

Piotrek 

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html
 
<https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html>

> On 30 Jun 2019, at 04:47, wang xuchen <ben....@gmail.com> wrote:
> 
> Hi Flink experts,
> 
> I am prototyping a real time system that reads from Kafka source with Flink 
> and calls out to an external system as part of the event processing. One of 
> the most important requirements are read from Kafka should NEVER stall, even 
> in face of some async external calls slowness while holding certain some 
> kafka offsets. At least once processing is good enough. 
> 
> Currently, I am using AsyncIO with a thread pool of size 20. My understanding 
> is if I use orderedwait with a large 'capacity', consumption from Kafka 
> should continue even if some external calls experience slowness (holding the 
> offsets) as long as the capacity is not exhausted. 
> 
> (From my own reading of Flink source code, the capacity of the orderedwait 
> function translate to the size of the OrderedStreamElementQueue size.)
> 
> However, I expect that while the external calls stuck, stream source should 
> keep pumping out from Kafka as long as there is still capacity, but offset 
> after the stuck record should NOT be committed back to Kafka and (the 
> checkpoint should also stall to accomodate the stalled offests?)
> 
> My observation is, if I set the capacity large enough (max_int / 100 for 
> instance), the consumption was not stalled (which is good), but the offsets 
> were all committed back to Kafka AFTER the stalled records and all checkpoint 
> succeeded, no back pressure was incurred.
> 
> In this case, if some machines crash, how does Flink recover the stalled 
> offsets? Which checkpoint does Flink rollback to?  I understand that 
> commiting offset back to Kafka is merely to show progress to external 
> monitoring tool, but I hope Flink does book keeping somewhere to journal 
> async call xyz is not return and should be retried during recovery.
> 
> Thanks a lot
> Ben
> 
> 
> 
> 

Reply via email to