The keyBy argument function is a deterministic function under same 
MaxParallelism,  which make sure the key group is always same for same key.

My goal here is making keys distributed evenly among operators even with 
different parallelism. I implement the mapping in a different way by exhausting 
search of a special salt key.

In my understanding, the implementation in the gist, the mapping is almost 
evenly for one parallelism,  and the key group is stable for all keys. But 
Changing parallelism could be a problem.

Sent with a Spark
On Nov 4, 2021, 3:25 PM +0800, Yuval Itzchakov <yuva...@gmail.com>, wrote:
> Thank you Schwalbe, David and Naitong for your answers!
>
> David: This is what we're currently doing ATM, and I wanted to know if 
> there's any simplified approach to this. This is what we have so far: 
> https://gist.github.com/YuvalItzchakov/9441a4a0e80609e534e69804e94cb57bNaitong:
>  The keyBy internally will rehash the key you provide it. How do you make 
> sure that the re-hashed key is still in the desired key group range?
> Schwalbe:
>  • Assuming that all your 4 different keys are evenly distributed, and you 
> send them to (only) 3 buckets, you would expect at least one bucket to cover 
> 2 of your keys, hence the 50% - You're right, this is the desire behavior I 
> actually want, I don't want them to be really uniformly distributed as I want 
> to batch multiple keys together in the same bucket. • With low entropy keys 
> avoiding data skew is quite difficult - I understand, and we are well aware 
> of the implications. • But your situation could be worse, all 4 keys could 
> end up in the same bucket, if the hash function in use happens to generate 
> collisions for the 4 keys, in which case 2 of your 3 buckets would not 
> process any events … this could also lead to watermarks not progressing … - 
> We take care of this internally as we understand there may be skewing to the 
> buckets. I don't care about watermarks at this stage. • There is two proposal 
> on how to improve the situation: • Use the same parallelism and max 
> parallelism for the relevant operators and implement a manual partitioner • A 
> manual partitioner is also good in situations where you want to lower the 
> bias and you exactly know the distribution of your key space and rearrange 
> keys to even-out numbers - I looked into custom partitioning, but it seems to 
> not work with KeyedDataStream, and I need the distribution to be performed 
> when keying the stream. • More sophisticated (if possible), 
> divide-and-conquer like - Interesting idea, but I'm not sure I follow. Could 
> you possibly provide a sketch of the transformations on the stream? • Key by 
> your ‘small’ key plus soma arbitrary attribute with higher entropy • Window 
> aggregate first on that artificial key • Aggregate the results on your 
> original ‘small’ key • This could be interesting for high-throughput 
> situation where you actually want to run in parallelism higher than the 
> number of different ‘small’ keys
>
> On Thu, Nov 4, 2021 at 5:48 AM naitong Xiao <xiaonait...@gmail.com> wrote:
> > I think I had a similar scenario several months ago, here is my related 
> > code:
> >
> > val MAX_PARALLELISM = 16
> > val KEY_RAND_SALT = “73b46”
> >
> > logSource.keyBy{ value =>
> >  val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid, 
> > MAX_PARALLELISM)
> >  s"$KEY_RAND_SALT$keyGroup"
> > }
> >
> > The keyGroup is just like your bucket id,  and the KEY_RAND_SALT was 
> > generated by some script to map bucket id evenly to operators under the max 
> > parallelism.
> >
> > Sent with a SparkOn Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov 
> > <yuva...@gmail.com>, wrote:
> > > Hi,
> > > I have a use-case where I'd like to partition a KeyedDataStream a bit 
> > > differently than how Flinks default partitioning works with key groups.
> > >
> > > <image.png>
> > > What I'd like to be able to do is take all my data and split it up evenly 
> > > between 3 buckets which will store the data in the state. Using the key 
> > > above works, but splits the data unevenly between the different key 
> > > groups, as usually the key space is very small (0 - 3). What ends up 
> > > happening is that sometimes 50% of the keys end up on the same operator 
> > > index, where ideally I'd like to distribute it evenly between all 
> > > operator indexes in the cluster.
> > >
> > > Is there any way of doing this?
> > > --
> > > Best Regards,
> > > Yuval Itzchakov.
>
>
> --
> Best Regards,
> Yuval Itzchakov.

Reply via email to