I guess I could implement a solution which is not static and extends the OneInputStreamOperator Flink operator. https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L84
Best, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Thu, Apr 11, 2019 at 2:21 PM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > thanks All for your suggestions! > > I am not sure if the option 3 that Fabian said I will need to change the > Flink source code or it can be implemented on top of Flink. > ------------------------- > 3) One approach to improve the processing of skewed data, is to change how > keyed state is handled. > Flink's keyed state is partitioned in two steps: > 1. each key is assigned to a key group based on an internal hash function. > 2. each key group is assigned to and processed by a parallel operator task. > For full control over data placement, you need to control both. > Changing 1) is tricky because it affects savepoint compatibility. > Changing 2) does not help if two hot keys are assigned to the same keyed > state. > ------------------------- > I did an experiment > <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeySkewedDAG.java#L66> > with a Mapper function > <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/udfs/StationPlatformSkewedKeyMapper.java> > that maps to a key with one more parameter (a skew parameter). The results > are better. > > Integer skewParameter = 0; > if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3))) > { // this is the skewed key > skewParameter = this.skewParameterGenerator.getNextItem(); > } > CompositeSkewedKeyStationPlatform compositeKey = new > CompositeSkewedKeyStationPlatform(stationId, platformId, skewParameter); > > But it is still a static solution =(. I mean, the developer has to set on > the Mapper which key is skewed. > > Best, > Felipe > > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > > On Thu, Apr 11, 2019 at 1:49 PM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Just a small addition: >> >> If two hot keys fall into two key groups which are being processed by the >> same TM, then it could help to change the parallelism, because then the key >> group mapping might be different. >> >> If two hot keys fall into the same key group, you can adjust the max >> parallelism which defines how many key groups will be used. By changing the >> number, it might happen that the two hot keys fall into different key >> groups. >> >> Cheers, >> Till >> >> On Thu, Apr 11, 2019 at 9:22 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Felipe, >>> >>> three comments: >>> >>> 1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no >>> effect: >>> keyBy() introduces a hash partitioning such that any data partitioning >>> that you do immediately before keyBy() is destroyed. >>> You only change the distribution for the call of the key extractor which >>> should be a lightweight function anyway. >>> That's why you do not see any difference between the three methods. >>> >>> 2) windowAll() defines a non-keyed window over the whole stream. >>> All records are processed by the same non-parallel instance of the >>> window operator. >>> That's why assigning a higher parallelism to that operator does not help. >>> >>> 3) One approach to improve the processing of skewed data, is to change >>> how keyed state is handled. >>> Flink's keyed state is partitioned in two steps: >>> 1. each key is assigned to a key group based on an internal hash >>> function. >>> 2. each key group is assigned to and processed by a parallel operator >>> task. >>> For full control over data placement, you need to control both. >>> Changing 1) is tricky because it affects savepoint compatibility. >>> Changing 2) does not help if two hot keys are assigned to the same keyed >>> state. >>> >>> Best, Fabian >>> >>> Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez < >>> felipe.o.gutier...@gmail.com>: >>> >>>> Hi, >>>> >>>> I am studying data skew processing in Flink and how I can change the >>>> low-level control of physical partition ( >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning) >>>> in order to have an even processing of tuples. I have created synthetic >>>> skewed data sources and I aim to process (aggregate) them over a window. >>>> Here is the complete code: >>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61 >>>> >>>> streamTrainsStation01.union(streamTrainsStation02) >>>> .union(streamTicketsStation01).union(streamTicketsStation02) >>>> // map the keys >>>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper) >>>> .rebalance() // or .rescale() .shuffle() >>>> .keyBy(new StationPlatformKeySelector()) >>>> .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) >>>> .apply(new >>>> StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction) >>>> .setParallelism(4) >>>> .map(new >>>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper) >>>> .addSink(new MqttStationPlatformPublisher(ipAddressSink, >>>> topic)).name(metricSinkFunction) >>>> ; >>>> >>>> According to the Flink dashboard I could not see too much difference >>>> among .shuffle(), .rescale(), and .rebalance(). Even though the >>>> documentation says rebalance() transformation is more suitable for data >>>> skew. >>>> >>>> After that I tried to use .partitionCustom(partitioner, "someKey"). >>>> However, for my surprise, I could not use setParallelism(4) on the window >>>> operation. The documentation says "Note: This operation is inherently >>>> non-parallel since all elements have to pass through the same operator >>>> instance.". I did not understand why. If I am allowed to do partitionCustom >>>> why can't I use parallelism after that? >>>> Here is the complete code: >>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedRescaleByKeyDAG.java#L74 >>>> >>>> streamTrainsStation01.union(streamTrainsStation02) >>>> .union(streamTicketsStation01).union(streamTicketsStation02) >>>> // map the keys >>>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper) >>>> .partitionCustom(new StationPlatformKeyCustomPartitioner(), new >>>> StationPlatformKeySelector()) >>>> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))) >>>> .apply(new >>>> StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction) >>>> .map(new >>>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper) >>>> .addSink(new MqttStationPlatformPublisher(ipAddressSink, >>>> topic)).name(metricSinkFunction) >>>> ; >>>> >>>> Thanks, >>>> Felipe >>>> >>>> *--* >>>> *-- Felipe Gutierrez* >>>> >>>> *-- skype: felipe.o.gutierrez* >>>> *--* *https://felipeogutierrez.blogspot.com >>>> <https://felipeogutierrez.blogspot.com>* >>>> >>>