Is there any way to identify the last message inside RichFunction in BATCH
mode ?



On Wed, Feb 9, 2022 at 8:56 AM [email protected] <[email protected]>
wrote:

> I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
> DataStream api. mapPartition is not available in Flink DataStream.
> *Current Code using Flink 1.12.x DataSet :*
>
> dataset
>     .<few operations>
>     .mapPartition(new SomeMapParitionFn())
>     .<few more operations>
>
> public static class SomeMapPartitionFn extends 
> RichMapPartitionFunction<InputModel, OutputModel> {
>
>     @Override
>     public void mapPartition(Iterable<InputModel> records, 
> Collector<OutputModel> out) throws Exception {
>         for (InputModel record : records) {
>             /*
>             do some operation
>              */
>             if (/* some condition based on processing *MULTIPLE* records */) 
> {*                out.collect(...); // Conditional collect                
> ---> (1)*            }
>         }
>
>         // At the end of the data, collect*        out.collect(...);   // 
> Collect processed data                   ---> (2) *    }
> }
>
>
>    -
>
>    (1) - Collector.collect invoked based on some condition after
>    processing few records
>    -
>
>    (2) - Collector.collect invoked at the end of data
>
>    Initially we thought of using flatMap instead of mapPartition, but the
>    collector is not available in close function.
>
>    https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>    case of chained drivers
>
> How to implement this in Flink 1.14.x DataStream? Please advise...
>
> *Note*: Our application works with only finite set of data (Batch Mode)
>
>
>

Reply via email to