Hi to all, in my program I have a Dataset that generated different types of object wrt the incoming element. Thus it's like a Map<Tuple2,Object>. In order to type the different generated datasets I do something:
Dataset<Tuple2> start =... Dataset<MyObj1> ds1 = start.filter().map(..); Dataset<MyObj1> ds2 = start.filter().map(..); Dataset<MyObj3> ds3 = start.filter().map(..); Dataset<MyObj3> ds4 = start.filter().map(..); However this is very inefficient (I think because Flink needs to materialize the entire source dataset for every slot). It's much more efficient to group the generation of objects of the same type. E.g.: Dataset<Tuple2> start =.. Dataset<MyObj1> tmp1 = start.map(..); Dataset<MyObj3> tmp2 = start.map(..); Dataset<MyObj1> ds1 = tmp1.filter(); Dataset<MyObj1> ds2 = tmp1.filter(); Dataset<MyObj3> ds3 = tmp2.filter(); Dataset<MyObj3> ds4 = tmp2.filter(); Increasing the number of slots per task manager make things worse and worse :) Is there a way to improve this situation? Is it possible to write a "map" generating different type of object and then filter them by generated class type? Best, Flavio