Hello flink-dev,

The aim is to achieve data locality for source connectors and for (dynamic for 
our case) task/operator scaling.
Current version (flink 1.8) - some_hash(key_hash) % parallelism - just a 
round-robin algorithm.
Proposal - control on datastream object-key layer its key-group distribution 
via simple interface implementation, find attached diff (for flinlk 1.8.0) and 
auxiliary java-interface class. So by virtue of implementation this interface 
one can manage key-group assignment. Very flexible and without any significant 
runtime changing.
For us it gives speedup with reading from already groupby* kafka-source (custom 
topic-partition distribution) - no need to reshuffle, source topic-partition 
reader chained with next local operator (see user-mail archive [1] and issue 
[2]).
Also it can help for parallelism (rebalance) changing. Globally, when full task 
stopped and then restored with different parallelism value. And within task 
execution, say operator1().parallelism(4).operator2().parallelism(6). In this 
scenario current (flink 1.8) realization would move entire all data across all 
task node (the truth be told, 5/6 data will be moved). But we can save current 
group assignment (in the key object - the old_parallelism = 4) and we know new 
value, new_parallelism = 6. So we can move only part of the keys - 2/6 from 
each node and entire data (network, disk, memory) traffic would be in 2.5 times 
smaller, see tables below:
Old parallelism = 4, 24 different keys
Node|key
0| 00, 04, 08, 12, 16, 20
1| 01, 05, 09, 13, 17, 21
2| 02, 06, 10, 14, 18, 22
3| 03, 07, 11, 15, 19, 23
New parallelism = 6, same keys
Node|key
0| 00, 04, 12, 16
1| 01, 05, 13, 17
2| 02, 06, 14, 18
3| 03, 07, 15, 19
4|08, 09, 10, 11
5|20, 21, 22, 23
Take third item from every task node (0..3) and move it to new node4, then take 
next third item and move it to node5 then next third goes to node4 and so on.
The gain is not only less data copied but also (of course depends on 
realization) less clearing* java-operation (no need to resend all data, save it 
to some new heap and clear all previous), garbage collection, memory and also 
there are other area to tune/improve.
In additional, yes, the network is fast, 10GbE or even more, but the truth is a 
number of virtual layers (OpenStack, VMware, etc) with a finite limited tcp\ip 
stack, so the real speed and lags are different to declared (by manufacture).

Does this way corresponds with farther flink evolution? Can it be merged into 
1.8 or later version?

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/kafka-partitions-data-locality-td27355.html
[2] https://issues.apache.org/jira/browse/FLINK-12294


Best, Sergey

Reply via email to