yes. It will be very welcome a discussion with who knows better than me. Basically, I am trying to implement the issue FLINK-1725 [1] that was gave up on March 2017. Stephan Ewen said that there are more issues to be fixed before going to this implementation and I don't really know which are them.
[1] https://issues.apache.org/jira/browse/FLINK-1725 Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Mon, Sep 23, 2019 at 3:47 PM Biao Liu <mmyy1...@gmail.com> wrote: > Wow, that's really cool! There are indeed a lot works you have done. IMO > it's beyond the scope of user group somewhat. > > Just one small concern, I'm not sure I have fully understood your way of > "tackle data skew by altering the way Flink partition keys using > KeyedStream". > > From my understanding, key-group is used for rescaling job. Like > supporting reusing state after changing the parallelism of operator. > I'm not sure whether you are in the right direction or not. It seems that > you are implementing something deeper than user interface. User interface > is stable, while implementation is not. Usually it's not recommended to > support a feature based on implementation. > > If you have strong reasons to change the implementation, I would suggest > to start a discussion in dev mailing list. Maybe it could be supported > officially. What do you think? > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Mon, 23 Sep 2019 at 20:54, Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> >> I`ve implemented a combiner [1] in Flink by extending >> OneInputStreamOperator in Flink. I call my operator using "transform". >> It works well and I guess it is useful if I import this operator in the >> DataStream.java. I just need more to check if I need to touch other parts >> of the source code. >> >> But now I want to tackle data skew by altering the way Flink partition >> keys using KeyedStream. >> >> [1] >> https://felipeogutierrez.blogspot.com/2019/08/implementing-dynamic-combiner-mini.html >> >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Mon, Sep 23, 2019 at 2:37 PM Biao Liu <mmyy1...@gmail.com> wrote: >> >>> Hi Felipe, >>> >>> If I understand correctly, you want to solve data skew caused by >>> imbalanced key? >>> >>> There is a common strategy to solve this kind of problem, >>> pre-aggregation. Like combiner of MapReduce. >>> But sadly, AFAIK Flink does not support pre-aggregation currently. I'm >>> afraid you have to implement it by yourself. >>> >>> For example, introducing a function caching some data (time or count >>> based). This function should be before "keyby". And it's on a non-keyed >>> stream. It does pre-aggregation just like what the aggregation after >>> "keyby" does. In this way, the skewed keyed data would be reduced a lot. >>> >>> I also found a suggestion [1] from Fabian, although it's long time ago. >>> >>> Hope it helps. >>> >>> 1. >>> https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation >>> >>> Thanks, >>> Biao /'bɪ.aʊ/ >>> >>> >>> >>> On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez < >>> felipe.o.gutier...@gmail.com> wrote: >>> >>>> thanks Biao, >>>> >>>> I see. To achieve what I want to do I need to work with KeyedStream. I >>>> downloaded the Flink source code to learn and alter the KeyedStream to my >>>> needs. I am not sure but it is a lot of work because as far as I understood >>>> the key-groups have to be predictable [1]. and altering this touches a lot >>>> of other parts of the source code. >>>> >>>> However, If I guarantee that they (key-groups) are predictable, I will >>>> be able to rebalance, rescale, .... the keys to other worker-nodes. >>>> >>>> [1] >>>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html >>>> >>>> Thanks, >>>> Felipe >>>> *--* >>>> *-- Felipe Gutierrez* >>>> >>>> *-- skype: felipe.o.gutierrez* >>>> *--* *https://felipeogutierrez.blogspot.com >>>> <https://felipeogutierrez.blogspot.com>* >>>> >>>> >>>> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mmyy1...@gmail.com> wrote: >>>> >>>>> Hi Felipe, >>>>> >>>>> Flink job graph is DAG based. It seems that you set an "edge property" >>>>> (partitioner) several times. >>>>> Flink does not support multiple partitioners on one edge. The later >>>>> one overrides the priors. That means the "keyBy" overrides the "rebalance" >>>>> and "partitionByPartial". >>>>> >>>>> You could insert some nodes between these partitioners to satisfy your >>>>> requirement. For example, >>>>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();` >>>>> >>>>> Thanks, >>>>> Biao /'bɪ.aʊ/ >>>>> >>>>> >>>>> >>>>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez < >>>>> felipe.o.gutier...@gmail.com> wrote: >>>>> >>>>>> I am executing a data stream application which uses rebalance. >>>>>> Basically I am counting words using "src -> split -> >>>>>> physicalPartitionStrategy -> keyBy -> sum -> print". I am running 3 >>>>>> examples, one without physical partition strategy, one with rebalance >>>>>> strategy [1], and one with partial partition strategy from [2]. >>>>>> I know that the keyBy operator actually kills what rebalance is doing >>>>>> because it splits the stream by key and if I have a stream with skewed >>>>>> key, >>>>>> one parallel instance of the operator after the keyBy will be overloaded. >>>>>> However, I was expecting that *before the keyBy* I would have a >>>>>> balanced stream, which is not happening. >>>>>> >>>>>> Basically, I want to see the difference in records/sec between >>>>>> operators when I use rebalance or any other physical partition strategy. >>>>>> However, when I found no difference in the records/sec metrics of all >>>>>> operators when I am running 3 different physical partition strategies. >>>>>> Screenshots of Prometheus+Grafana are attached. >>>>>> >>>>>> Maybe I am measuring the wrong operator, or maybe I am not using the >>>>>> rebalance in the right way, or I am not doing a good use case to test the >>>>>> rebalance transformation. >>>>>> I am also testing a different physical partition to later try to >>>>>> implement the issue "FLINK-1725 New Partitioner for better load balancing >>>>>> for skewed data" [2]. I am not sure, but I guess that all physical >>>>>> partition strategies have to be implemented on a KeyedStream. >>>>>> >>>>>> DataStream<String> text = env.addSource(new WordSource()); >>>>>> // split lines in strings >>>>>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new >>>>>> Tokenizer()); >>>>>> // choose a partitioning strategy >>>>>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer); >>>>>> DataStream<Tuple2<String, Integer>> partitionedStream = >>>>>> tokenizer.rebalance(); >>>>>> DataStream<Tuple2<String, Integer>> partitionedStream = >>>>>> tokenizer.partitionByPartial(0); >>>>>> // count >>>>>> partitionedStream.keyBy(0).sum(1).print(); >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning >>>>>> [2] https://issues.apache.org/jira/browse/FLINK-1725 >>>>>> >>>>>> thanks, >>>>>> Felipe >>>>>> >>>>>> *--* >>>>>> *-- Felipe Gutierrez* >>>>>> >>>>>> *-- skype: felipe.o.gutierrez* >>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>> >>>>>