Hi,

I am studying data skew processing in Flink and how I can change the
low-level control of physical partition (
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning)
in order to have an even processing of tuples. I have created synthetic
skewed data sources and I aim to process (aggregate) them over a window.
Here is the complete code:
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.rebalance() // or .rescale() .shuffle()
.keyBy(new StationPlatformKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new
StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.setParallelism(4)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink,
topic)).name(metricSinkFunction)
;

According to the Flink dashboard I could not see too much difference among
.shuffle(), .rescale(), and .rebalance(). Even though the documentation
says rebalance() transformation is more suitable for data skew.

After that I tried to use .partitionCustom(partitioner, "someKey").
However, for my surprise, I could not use setParallelism(4) on the window
operation. The documentation says "Note: This operation is inherently
non-parallel since all elements have to pass through the same operator
instance.". I did not understand why. If I am allowed to do partitionCustom
why can't I use parallelism after that?
Here is the complete code:
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedRescaleByKeyDAG.java#L74

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.partitionCustom(new StationPlatformKeyCustomPartitioner(), new
StationPlatformKeySelector())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new
StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink,
topic)).name(metricSinkFunction)
;

Thanks,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Reply via email to