Thanks Gen, I will look into customized Source and SpiltEnumerator. On Mon, Mar 7, 2022 at 10:20 PM Gen Luo <luogen...@gmail.com> wrote:
> Hi Diwakar, > > An asynchronous flatmap function without the support of the framework can > be problematic. You should not call collector.collect outside the main > thread of the task, i.e. outside the flatMap method. > > I'd suggest using a customized Source instead to process the files, which > uses a SplitEnumerator to discover the files and SourceReaders to read the > files. In this way checkpoints can be triggered between two calls of > pollNext, so you don't have to implement it asynchronously. It would be > better if the readers read the lines and the records are enriched in a map > function following. > > > > On Tue, Mar 8, 2022 at 5:17 AM Diwakar Jha <diwakar.n...@gmail.com> wrote: > >> Hello Everyone, >> >> I'm running a streaming application using Flink 1.11 and EMR 6.01. My use >> case is reading files from a s3 bucket, filter file contents ( say record) >> and enrich each record. Filter records and output to a sink. >> I'm reading 6k files per 15mints and the total number of records is 3 >> billion/15mints. I'm using a flat map operator to convert the file into >> records and enrich records in a synchronous call. >> >> *Problem* : My application fails (Checkpoint timeout) to run if i add >> more filter criteria(operator). I suspect the system is not able to scale >> (CPU util as still 20%) because of the synchronous call. I want to convert >> this flat map to an asynchronous call using AsyncFunction. I was looking >> for something like an AsyncCollector.collect >> <https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.html#collect-java.util.Collection-> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html >> to complement my current synchronous implementation using flatmap but it >> seems like this is not available in Flink 1.11. >> >> *Question* : >> Could someone please help me with converting this flatmap operation to an >> asynchronous call? >> >> Please let me know if you have any questions. >> >> Best, >> >