I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
DataStream api. mapPartition is not available in Flink DataStream.
*Current Code using Flink 1.12.x DataSet :*
dataset
.<few operations>
.mapPartition(new SomeMapParitionFn())
.<few more operations>
public static class SomeMapPartitionFn extends
RichMapPartitionFunction<InputModel, OutputModel> {
@Override
public void mapPartition(Iterable<InputModel> records,
Collector<OutputModel> out) throws Exception {
for (InputModel record : records) {
/*
do some operation
*/
if (/* some condition based on processing *MULTIPLE*
records */) {* out.collect(...); // Conditional collect
---> (1)* }
}
// At the end of the data, collect* out.collect(...);
// Collect processed data ---> (2) * }
}
-
(1) - Collector.collect invoked based on some condition after processing
few records
-
(2) - Collector.collect invoked at the end of data
Initially we thought of using flatMap instead of mapPartition, but the
collector is not available in close function.
https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
case of chained drivers
How to implement this in Flink 1.14.x DataStream? Please advise...
*Note*: Our application works with only finite set of data (Batch Mode)