Hi Saravanan,

AFAIK the last record is not treated differently.

Does the approach in [1] not work? 

Best regards,
Niklas

https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279


> On 9. Feb 2022, at 20:31, saravana...@gmail.com <saravana...@gmail.com> wrote:
> 
> Is there any way to identify the last message inside RichFunction in BATCH 
> mode ?
> 
> 
> 
> On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com <saravana...@gmail.com> 
> wrote:
> I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x 
> DataStream api. mapPartition is not available in Flink DataStream.
> 
> Current Code using Flink 1.12.x DataSet :
> 
> dataset
>     .<few operations>
>     .mapPartition(new SomeMapParitionFn())
>     .<few more operations>
> 
> public static class SomeMapPartitionFn extends 
> RichMapPartitionFunction<InputModel, OutputModel> {
> 
>     @Override
>     public void mapPartition(Iterable<InputModel> records, 
> Collector<OutputModel> out) throws Exception {
>         for (InputModel record : records) {
>             /*
>             do some operation    
>              */
>             if (/* some condition based on processing *MULTIPLE* records */) {
> 
>                 out.collect(...); // Conditional collect                ---> 
> (1)
>             }
>         }
>         
>         // At the end of the data, collect
> 
>         out.collect(...);   // Collect processed data                   ---> 
> (2) 
>     }
> }
> 
>       • (1) - Collector.collect invoked based on some condition after 
> processing few records
>       • (2) - Collector.collect invoked at the end of data
> 
> Initially we thought of using flatMap instead of mapPartition, but the 
> collector is not available in close function.
> 
> https://issues.apache.org/jira/browse/FLINK-14709 - Only available in case of 
> chained drivers
> How to implement this in Flink 1.14.x DataStream? Please advise...
> 
> Note: Our application works with only finite set of data (Batch Mode)
> 

Reply via email to