Thank you Schwalbe, David and Naitong for your answers!
*David*: This is what we're currently doing ATM, and I wanted to know if
there's any simplified approach to this. This is what we have so far:
https://gist.github.com/YuvalItzchakov/9441a4a0e80609e534e69804e94cb57b
*Naitong*: The keyBy internally will rehash the key you provide it. How do
you make sure that the re-hashed key is still in the desired key group
range?
*Schwalbe*:
- Assuming that all your 4 different keys are evenly distributed, and
you send them to (only) 3 buckets, you would expect at least one bucket to
cover 2 of your keys, hence the 50% - You're right, this is the desire
behavior I actually want, I don't want them to be really uniformly
distributed as I want to batch multiple keys together in the same bucket.
- With low entropy keys avoiding data skew is quite difficult - I
understand, and we are well aware of the implications.
- But your situation could be worse, all 4 keys could end up in the same
bucket, if the hash function in use happens to generate collisions for the
4 keys, in which case 2 of your 3 buckets would not process any events …
this could also lead to watermarks not progressing … - We take care of
this internally as we understand there may be skewing to the buckets. I
don't care about watermarks at this stage.
- There is two proposal on how to improve the situation:
- Use the same parallelism and max parallelism for the relevant
operators and implement a manual partitioner
- A manual partitioner is also good in situations where you want
to lower the bias and you exactly know the distribution of
your key space
and rearrange keys to even-out numbers - I looked into custom
partitioning, but it seems to not work with KeyedDataStream,
and I need the
distribution to be performed when keying the stream.
- More sophisticated (if possible), divide-and-conquer like -
Interesting idea, but I'm not sure I follow. Could you possibly provide a
sketch of the transformations on the stream?
- Key by your ‘small’ key plus soma arbitrary attribute with
higher entropy
- Window aggregate first on that artificial key
- Aggregate the results on your original ‘small’ key
- This could be interesting for high-throughput situation where
you actually want to run in parallelism higher than the
number of different
‘small’ keys
On Thu, Nov 4, 2021 at 5:48 AM naitong Xiao <[email protected]> wrote:
> I think I had a similar scenario several months ago, here is my related
> code:
>
> val MAX_PARALLELISM = 16
> val KEY_RAND_SALT = “73b46”
>
> logSource.keyBy{ value =>
> val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid,
> MAX_PARALLELISM)
> s"$KEY_RAND_SALT$keyGroup"
> }
>
> The keyGroup is just like your bucket id, and the KEY_RAND_SALT was
> generated by some script to map bucket id evenly to operators under the max
> parallelism.
>
> Sent with a Spark <https://sparkmailapp.com/source?from=signature>
> On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov <[email protected]>, wrote:
>
> Hi,
> I have a use-case where I'd like to partition a KeyedDataStream a bit
> differently than how Flinks default partitioning works with key groups.
>
> <image.png>
> What I'd like to be able to do is take all my data and split it up evenly
> between 3 buckets which will store the data in the state. Using the key
> above works, but splits the data unevenly between the different key groups,
> as usually the key space is very small (0 - 3). What ends up happening is
> that sometimes 50% of the keys end up on the same operator index, where
> ideally I'd like to distribute it evenly between all operator indexes in
> the cluster.
>
> Is there any way of doing this?
> --
> Best Regards,
> Yuval Itzchakov.
>
>
--
Best Regards,
Yuval Itzchakov.