Hi Saravanan, AFAIK the last record is not treated differently.
Does the approach in [1] not work? Best regards, Niklas https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279 > On 9. Feb 2022, at 20:31, saravana...@gmail.com <saravana...@gmail.com> wrote: > > Is there any way to identify the last message inside RichFunction in BATCH > mode ? > > > > On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com <saravana...@gmail.com> > wrote: > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x > DataStream api. mapPartition is not available in Flink DataStream. > > Current Code using Flink 1.12.x DataSet : > > dataset > .<few operations> > .mapPartition(new SomeMapParitionFn()) > .<few more operations> > > public static class SomeMapPartitionFn extends > RichMapPartitionFunction<InputModel, OutputModel> { > > @Override > public void mapPartition(Iterable<InputModel> records, > Collector<OutputModel> out) throws Exception { > for (InputModel record : records) { > /* > do some operation > */ > if (/* some condition based on processing *MULTIPLE* records */) { > > out.collect(...); // Conditional collect ---> > (1) > } > } > > // At the end of the data, collect > > out.collect(...); // Collect processed data ---> > (2) > } > } > > • (1) - Collector.collect invoked based on some condition after > processing few records > • (2) - Collector.collect invoked at the end of data > > Initially we thought of using flatMap instead of mapPartition, but the > collector is not available in close function. > > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in case of > chained drivers > How to implement this in Flink 1.14.x DataStream? Please advise... > > Note: Our application works with only finite set of data (Batch Mode) >