Thanks Gen, I will look into customized Source and SpiltEnumerator.

On Mon, Mar 7, 2022 at 10:20 PM Gen Luo <luogen...@gmail.com> wrote:

> Hi Diwakar,
>
> An asynchronous flatmap function without the support of the framework can
> be problematic. You should not call collector.collect outside the main
> thread of the task, i.e. outside the flatMap method.
>
> I'd suggest using a customized Source instead to process the files, which
> uses a SplitEnumerator to discover the files and SourceReaders to read the
> files. In this way checkpoints can be triggered between two calls of
> pollNext, so you don't have to implement it asynchronously. It would be
> better if the readers read the lines and the records are enriched in a map
> function following.
>
>
>
> On Tue, Mar 8, 2022 at 5:17 AM Diwakar Jha <diwakar.n...@gmail.com> wrote:
>
>> Hello Everyone,
>>
>> I'm running a streaming application using Flink 1.11 and EMR 6.01. My use
>> case is reading files from a s3 bucket, filter file contents ( say record)
>> and enrich each record. Filter records and output to a sink.
>> I'm reading 6k files per 15mints and the total number of records is 3
>> billion/15mints. I'm using a flat map operator to convert the file into
>> records and enrich records in a synchronous call.
>>
>> *Problem* : My application fails (Checkpoint timeout) to run if i add
>> more filter criteria(operator). I suspect the system is not able to scale
>> (CPU util as still 20%) because of the synchronous call. I want to convert
>> this flat map to an asynchronous call using AsyncFunction. I was looking
>> for something like an AsyncCollector.collect
>> <https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.html#collect-java.util.Collection->
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html
>> to complement my current synchronous implementation using flatmap but it
>> seems like this is not available in Flink 1.11.
>>
>> *Question* :
>> Could someone please help me with converting this flatmap operation to an
>> asynchronous call?
>>
>> Please let me know if you have any questions.
>>
>> Best,
>>
>

Reply via email to