Is there any way to identify the last message inside RichFunction in BATCH mode ?
On Wed, Feb 9, 2022 at 8:56 AM [email protected] <[email protected]> wrote: > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x > DataStream api. mapPartition is not available in Flink DataStream. > *Current Code using Flink 1.12.x DataSet :* > > dataset > .<few operations> > .mapPartition(new SomeMapParitionFn()) > .<few more operations> > > public static class SomeMapPartitionFn extends > RichMapPartitionFunction<InputModel, OutputModel> { > > @Override > public void mapPartition(Iterable<InputModel> records, > Collector<OutputModel> out) throws Exception { > for (InputModel record : records) { > /* > do some operation > */ > if (/* some condition based on processing *MULTIPLE* records */) > {* out.collect(...); // Conditional collect > ---> (1)* } > } > > // At the end of the data, collect* out.collect(...); // > Collect processed data ---> (2) * } > } > > > - > > (1) - Collector.collect invoked based on some condition after > processing few records > - > > (2) - Collector.collect invoked at the end of data > > Initially we thought of using flatMap instead of mapPartition, but the > collector is not available in close function. > > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in > case of chained drivers > > How to implement this in Flink 1.14.x DataStream? Please advise... > > *Note*: Our application works with only finite set of data (Batch Mode) > > >
