Hi, I am studying data skew processing in Flink and how I can change the low-level control of physical partition ( https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning) in order to have an even processing of tuples. I have created synthetic skewed data sources and I aim to process (aggregate) them over a window. Here is the complete code: https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61
streamTrainsStation01.union(streamTrainsStation02) .union(streamTicketsStation01).union(streamTicketsStation02) // map the keys .map(new StationPlatformMapper(metricMapper)).name(metricMapper) .rebalance() // or .rescale() .shuffle() .keyBy(new StationPlatformKeySelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) .apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction) .setParallelism(4) .map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper) .addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction) ; According to the Flink dashboard I could not see too much difference among .shuffle(), .rescale(), and .rebalance(). Even though the documentation says rebalance() transformation is more suitable for data skew. After that I tried to use .partitionCustom(partitioner, "someKey"). However, for my surprise, I could not use setParallelism(4) on the window operation. The documentation says "Note: This operation is inherently non-parallel since all elements have to pass through the same operator instance.". I did not understand why. If I am allowed to do partitionCustom why can't I use parallelism after that? Here is the complete code: https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedRescaleByKeyDAG.java#L74 streamTrainsStation01.union(streamTrainsStation02) .union(streamTicketsStation01).union(streamTicketsStation02) // map the keys .map(new StationPlatformMapper(metricMapper)).name(metricMapper) .partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector()) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))) .apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction) .map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper) .addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction) ; Thanks, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>*