Just a small addition:

If two hot keys fall into two key groups which are being processed by the
same TM, then it could help to change the parallelism, because then the key
group mapping might be different.

If two hot keys fall into the same key group, you can adjust the max
parallelism which defines how many key groups will be used. By changing the
number, it might happen that the two hot keys fall into different key
groups.

Cheers,
Till

On Thu, Apr 11, 2019 at 9:22 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Felipe,
>
> three comments:
>
> 1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no
> effect:
> keyBy() introduces a hash partitioning such that any data partitioning
> that you do immediately before keyBy() is destroyed.
> You only change the distribution for the call of the key extractor which
> should be a lightweight function anyway.
> That's why you do not see any difference between the three methods.
>
> 2) windowAll() defines a non-keyed window over the whole stream.
> All records are processed by the same non-parallel instance of the window
> operator.
> That's why assigning a higher parallelism to that operator does not help.
>
> 3) One approach to improve the processing of skewed data, is to change how
> keyed state is handled.
> Flink's keyed state is partitioned in two steps:
> 1. each key is assigned to a key group based on an internal hash function.
> 2. each key group is assigned to and processed by a parallel operator task.
> For full control over data placement, you need to control both.
> Changing 1) is tricky because it affects savepoint compatibility.
> Changing 2) does not help if two hot keys are assigned to the same keyed
> state.
>
> Best, Fabian
>
> Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <
> felipe.o.gutier...@gmail.com>:
>
>> Hi,
>>
>> I am studying data skew processing in Flink and how I can change the
>> low-level control of physical partition (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning)
>> in order to have an even processing of tuples. I have created synthetic
>> skewed data sources and I aim to process (aggregate) them over a window.
>> Here is the complete code:
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61
>>
>> streamTrainsStation01.union(streamTrainsStation02)
>> .union(streamTicketsStation01).union(streamTicketsStation02)
>> // map the keys
>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
>> .rebalance() // or .rescale() .shuffle()
>> .keyBy(new StationPlatformKeySelector())
>> .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
>> .apply(new
>> StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
>> .setParallelism(4)
>> .map(new
>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
>> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
>> topic)).name(metricSinkFunction)
>> ;
>>
>> According to the Flink dashboard I could not see too much difference
>> among .shuffle(), .rescale(), and .rebalance(). Even though the
>> documentation says rebalance() transformation is more suitable for data
>> skew.
>>
>> After that I tried to use .partitionCustom(partitioner, "someKey").
>> However, for my surprise, I could not use setParallelism(4) on the window
>> operation. The documentation says "Note: This operation is inherently
>> non-parallel since all elements have to pass through the same operator
>> instance.". I did not understand why. If I am allowed to do partitionCustom
>> why can't I use parallelism after that?
>> Here is the complete code:
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedRescaleByKeyDAG.java#L74
>>
>> streamTrainsStation01.union(streamTrainsStation02)
>> .union(streamTicketsStation01).union(streamTicketsStation02)
>> // map the keys
>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
>> .partitionCustom(new StationPlatformKeyCustomPartitioner(), new
>> StationPlatformKeySelector())
>> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
>> .apply(new
>> StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
>> .map(new
>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
>> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
>> topic)).name(metricSinkFunction)
>> ;
>>
>> Thanks,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>

Reply via email to