Re: Custom partitioning of keys with keyBy

2021-11-04 Thread naitong Xiao
The keyBy argument function is a deterministic function under same 
MaxParallelism,  which make sure the key group is always same for same key.

My goal here is making keys distributed evenly among operators even with 
different parallelism. I implement the mapping in a different way by exhausting 
search of a special salt key.

In my understanding, the implementation in the gist, the mapping is almost 
evenly for one parallelism,  and the key group is stable for all keys. But 
Changing parallelism could be a problem.

Sent with a Spark
On Nov 4, 2021, 3:25 PM +0800, Yuval Itzchakov , wrote:
> Thank you Schwalbe, David and Naitong for your answers!
>
> David: This is what we're currently doing ATM, and I wanted to know if 
> there's any simplified approach to this. This is what we have so far: 
> https://gist.github.com/YuvalItzchakov/9441a4a0e80609e534e69804e94cb57bNaitong:
>  The keyBy internally will rehash the key you provide it. How do you make 
> sure that the re-hashed key is still in the desired key group range?
> Schwalbe:
>  • Assuming that all your 4 different keys are evenly distributed, and you 
> send them to (only) 3 buckets, you would expect at least one bucket to cover 
> 2 of your keys, hence the 50% - You're right, this is the desire behavior I 
> actually want, I don't want them to be really uniformly distributed as I want 
> to batch multiple keys together in the same bucket. • With low entropy keys 
> avoiding data skew is quite difficult - I understand, and we are well aware 
> of the implications. • But your situation could be worse, all 4 keys could 
> end up in the same bucket, if the hash function in use happens to generate 
> collisions for the 4 keys, in which case 2 of your 3 buckets would not 
> process any events … this could also lead to watermarks not progressing … - 
> We take care of this internally as we understand there may be skewing to the 
> buckets. I don't care about watermarks at this stage. • There is two proposal 
> on how to improve the situation: • Use the same parallelism and max 
> parallelism for the relevant operators and implement a manual partitioner • A 
> manual partitioner is also good in situations where you want to lower the 
> bias and you exactly know the distribution of your key space and rearrange 
> keys to even-out numbers - I looked into custom partitioning, but it seems to 
> not work with KeyedDataStream, and I need the distribution to be performed 
> when keying the stream. • More sophisticated (if possible), 
> divide-and-conquer like - Interesting idea, but I'm not sure I follow. Could 
> you possibly provide a sketch of the transformations on the stream? • Key by 
> your ‘small’ key plus soma arbitrary attribute with higher entropy • Window 
> aggregate first on that artificial key • Aggregate the results on your 
> original ‘small’ key • This could be interesting for high-throughput 
> situation where you actually want to run in parallelism higher than the 
> number of different ‘small’ keys
>
> On Thu, Nov 4, 2021 at 5:48 AM naitong Xiao  wrote:
> > I think I had a similar scenario several months ago, here is my related 
> > code:
> >
> > val MAX_PARALLELISM = 16
> > val KEY_RAND_SALT = “73b46”
> >
> > logSource.keyBy{ value =>
> >  val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid, 
> > MAX_PARALLELISM)
> >  s"$KEY_RAND_SALT$keyGroup"
> > }
> >
> > The keyGroup is just like your bucket id,  and the KEY_RAND_SALT was 
> > generated by some script to map bucket id evenly to operators under the max 
> > parallelism.
> >
> > Sent with a SparkOn Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov 
> > , wrote:
> > > Hi,
> > > I have a use-case where I'd like to partition a KeyedDataStream a bit 
> > > differently than how Flinks default partitioning works with key groups.
> > >
> > > 
> > > What I'd like to be able to do is take all my data and split it up evenly 
> > > between 3 buckets which will store the data in the state. Using the key 
> > > above works, but splits the data unevenly between the different key 
> > > groups, as usually the key space is very small (0 - 3). What ends up 
> > > happening is that sometimes 50% of the keys end up on the same operator 
> > > index, where ideally I'd like to distribute it evenly between all 
> > > operator indexes in the cluster.
> > >
> > > Is there any way of doing this?
> > > --
> > > Best Regards,
> > > Yuval Itzchakov.
>
>
> --
> Best Regards,
> Yuval Itzchakov.


RE: Custom partitioning of keys with keyBy

2021-11-04 Thread Schwalbe Matthias
Hi Yuval,

… I had to do some guesswork with regard to your use case … still not exactly 
clear what you want to achieve, however I remember having done something 
similar in that area 2 years ago.
Unfortunately I cannot find the implementation anymore ☹


  *   If you tried a combination of .partitionCustom() and 
reinterpretAsKeyedStream(): this will fail, because reinterpretAsKeyedStream() 
forces a ForwardPartitioner.
  *   You could still model your code after the implementation of 
reinterpretAsKeyedStream and use your own partitioner instead [1]
  *   Partitioning is relevant in two places:
 *   The outgoing Transform for selection of the output channel
 *   The incoming Transform for selecting the correct key range for state 
primitives
 *   You need to make sure that both sides agree

… for the last question regarding the more sophisticated scenario … please give 
me a little more time for a sketch … I also want to understand a little better 
your use case

Hope this helps

Thias





[1] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java#L185-L210

From: Yuval Itzchakov 
Sent: Donnerstag, 4. November 2021 08:25
To: naitong Xiao 
Cc: user 
Subject: Re: Custom partitioning of keys with keyBy

Thank you Schwalbe, David and Naitong for your answers!

David: This is what we're currently doing ATM, and I wanted to know if there's 
any simplified approach to this. This is what we have so far: 
https://gist.github.com/YuvalItzchakov/9441a4a0e80609e534e69804e94cb57b
Naitong: The keyBy internally will rehash the key you provide it. How do you 
make sure that the re-hashed key is still in the desired key group range?
Schwalbe:

  *   Assuming that all your 4 different keys are evenly distributed, and you 
send them to (only) 3 buckets, you would expect at least one bucket to cover 2 
of your keys, hence the 50% - You're right, this is the desire behavior I 
actually want, I don't want them to be really uniformly distributed as I want 
to batch multiple keys together in the same bucket.
  *   With low entropy keys avoiding data skew is quite difficult - I 
understand, and we are well aware of the implications.
  *   But your situation could be worse, all 4 keys could end up in the same 
bucket, if the hash function in use happens to generate collisions for the 4 
keys, in which case 2 of your 3 buckets would not process any events … this 
could also lead to watermarks not progressing … - We take care of this 
internally as we understand there may be skewing to the buckets. I don't care 
about watermarks at this stage.
  *   There is two proposal on how to improve the situation:

 *   Use the same parallelism and max parallelism for the relevant 
operators and implement a manual partitioner

*   A manual partitioner is also good in situations where you want to 
lower the bias and you exactly know the distribution of your key space and 
rearrange keys to even-out numbers - I looked into custom partitioning, but it 
seems to not work with KeyedDataStream, and I need the distribution to be 
performed when keying the stream.

 *   More sophisticated (if possible), divide-and-conquer like - 
Interesting idea, but I'm not sure I follow. Could you possibly provide a 
sketch of the transformations on the stream?

*   Key by your ‘small’ key plus soma arbitrary attribute with higher 
entropy
*   Window aggregate first on that artificial key
*   Aggregate the results on your original ‘small’ key
*   This could be interesting for high-throughput situation where you 
actually want to run in parallelism higher than the number of different ‘small’ 
keys


On Thu, Nov 4, 2021 at 5:48 AM naitong Xiao 
mailto:xiaonait...@gmail.com>> wrote:
I think I had a similar scenario several months ago, here is my related code:

val MAX_PARALLELISM = 16
val KEY_RAND_SALT = “73b46”

logSource.keyBy{ value =>
 val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid, 
MAX_PARALLELISM)
 s"$KEY_RAND_SALT$keyGroup"
}

The keyGroup is just like your bucket id,  and the KEY_RAND_SALT was generated 
by some script to map bucket id evenly to operators under the max parallelism.

Sent with a Spark<https://sparkmailapp.com/source?from=signature>
On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov 
mailto:yuva...@gmail.com>>, wrote:

Hi,
I have a use-case where I'd like to partition a KeyedDataStream a bit 
differently than how Flinks default partitioning works with key groups.


What I'd like to be able to do is take all my data and split it up evenly 
between 3 buckets which will store the data in the state. Using the key above 
works, but splits the data unevenly between the different key groups, as 
usually the key space is very small (0 - 3). What ends up happening is that 
sometimes 50% of the keys end up on the same operator index, where ideally I'

Re: Custom partitioning of keys with keyBy

2021-11-04 Thread Yuval Itzchakov
Thank you Schwalbe, David and Naitong for your answers!

*David*: This is what we're currently doing ATM, and I wanted to know if
there's any simplified approach to this. This is what we have so far:
https://gist.github.com/YuvalItzchakov/9441a4a0e80609e534e69804e94cb57b
*Naitong*: The keyBy internally will rehash the key you provide it. How do
you make sure that the re-hashed key is still in the desired key group
range?
*Schwalbe*:

   - Assuming that all your 4 different keys are evenly distributed, and
   you send them to (only) 3 buckets, you would expect at least one bucket to
   cover 2 of your keys, hence the 50% - You're right, this is the desire
   behavior I actually want, I don't want them to be really uniformly
   distributed as I want to batch multiple keys together in the same bucket.
   - With low entropy keys avoiding data skew is quite difficult - I
   understand, and we are well aware of the implications.
   - But your situation could be worse, all 4 keys could end up in the same
   bucket, if the hash function in use happens to generate collisions for the
   4 keys, in which case 2 of your 3 buckets would not process any events …
   this could also lead to watermarks not progressing … - We take care of
   this internally as we understand there may be skewing to the buckets. I
   don't care about watermarks at this stage.
   - There is two proposal on how to improve the situation:
  - Use the same parallelism and max parallelism for the relevant
  operators and implement a manual partitioner
 - A manual partitioner is also good in situations where you want
 to lower the bias and you exactly know the distribution of
your key space
 and rearrange keys to even-out numbers - I looked into custom
 partitioning, but it seems to not work with KeyedDataStream,
and I need the
 distribution to be performed when keying the stream.
  - More sophisticated (if possible), divide-and-conquer like -
  Interesting idea, but I'm not sure I follow. Could you possibly provide a
  sketch of the transformations on the stream?
 - Key by your ‘small’ key plus soma arbitrary attribute with
 higher entropy
 - Window aggregate first on that artificial key
 - Aggregate the results on your original ‘small’ key
 - This could be interesting for high-throughput situation where
 you actually want to run in parallelism higher than the
number of different
 ‘small’ keys



On Thu, Nov 4, 2021 at 5:48 AM naitong Xiao  wrote:

> I think I had a similar scenario several months ago, here is my related
> code:
>
> val MAX_PARALLELISM = 16
> val KEY_RAND_SALT = “73b46”
>
> logSource.keyBy{ value =>
>  val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid,
> MAX_PARALLELISM)
>  s"$KEY_RAND_SALT$keyGroup"
> }
>
> The keyGroup is just like your bucket id,  and the KEY_RAND_SALT was
> generated by some script to map bucket id evenly to operators under the max
> parallelism.
>
> Sent with a Spark 
> On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov , wrote:
>
> Hi,
> I have a use-case where I'd like to partition a KeyedDataStream a bit
> differently than how Flinks default partitioning works with key groups.
>
> 
> What I'd like to be able to do is take all my data and split it up evenly
> between 3 buckets which will store the data in the state. Using the key
> above works, but splits the data unevenly between the different key groups,
> as usually the key space is very small (0 - 3). What ends up happening is
> that sometimes 50% of the keys end up on the same operator index, where
> ideally I'd like to distribute it evenly between all operator indexes in
> the cluster.
>
> Is there any way of doing this?
> --
> Best Regards,
> Yuval Itzchakov.
>
>

-- 
Best Regards,
Yuval Itzchakov.


Re: Custom partitioning of keys with keyBy

2021-11-03 Thread naitong Xiao
I think I had a similar scenario several months ago, here is my related code:

val MAX_PARALLELISM = 16
val KEY_RAND_SALT = “73b46”

logSource.keyBy{ value =>
 val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid, 
MAX_PARALLELISM)
 s"$KEY_RAND_SALT$keyGroup"
}

The keyGroup is just like your bucket id,  and the KEY_RAND_SALT was generated 
by some script to map bucket id evenly to operators under the max parallelism.

Sent with a Spark
On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov , wrote:
> Hi,
> I have a use-case where I'd like to partition a KeyedDataStream a bit 
> differently than how Flinks default partitioning works with key groups.
>
> 
> What I'd like to be able to do is take all my data and split it up evenly 
> between 3 buckets which will store the data in the state. Using the key above 
> works, but splits the data unevenly between the different key groups, as 
> usually the key space is very small (0 - 3). What ends up happening is that 
> sometimes 50% of the keys end up on the same operator index, where ideally 
> I'd like to distribute it evenly between all operator indexes in the cluster.
>
> Is there any way of doing this?
> --
> Best Regards,
> Yuval Itzchakov.


Re: Custom partitioning of keys with keyBy

2021-11-03 Thread David Anderson
Another possibility, if you know in advance the values of the keys, is to
find a mapping that transforms the original keys into new keys that will,
in fact, end up in disjoint key groups that will, in turn, be assigned to
different slots (given a specific parallelism). This is ugly, but feasible.

For reference, the key group for a given key is

MathUtils.murmurHash(key.hashCode()) % maxParallelism

and a given key group will be assigned to the slot computed by

keyGroup * actualParallelism / maxParallelism

David



On Wed, Nov 3, 2021 at 3:35 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Yuval,
>
>
>
> Just a couple of comments:
>
>
>
>- Assuming that all your 4 different keys are evenly distributed, and
>you send them to (only) 3 buckets, you would expect at least one bucket to
>cover 2 of your keys, hence the 50%
>- With low entropy keys avoiding data skew is quite difficult
>- But your situation could be worse, all 4 keys could end up in the
>same bucket, if the hash function in use happens to generate collisions for
>the 4 keys, in which case 2 of your 3 buckets would not process any events
>… this could also lead to watermarks not progressing …
>- There is two proposal on how to improve the situation:
>   - Use the same parallelism and max parallelism for the relevant
>   operators and implement a manual partitioner
>  - A manual partitioner is also good in situations where you want
>  to lower the bias and you exactly know the distribution of your key 
> space
>  and rearrange keys to even-out numbers
>   - More sophisticated (if possible), divide-and-conquer like:
>  - Key by your ‘small’ key plus soma arbitrary attribute with
>  higher entropy
>  - Window aggregate first on that artificial key
>  - Aggregate the results on your original ‘small’ key
>  - This could be interesting for high-throughput situation where
>  you actually want to run in parallelism higher than the number of 
> different
>  ‘small’ keys
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> *From:* Yuval Itzchakov 
> *Sent:* Mittwoch, 3. November 2021 14:41
> *To:* user 
> *Subject:* Custom partitioning of keys with keyBy
>
>
>
> Hi,
>
> I have a use-case where I'd like to partition a KeyedDataStream a bit
> differently than how Flinks default partitioning works with key groups.
>
>
>
> What I'd like to be able to do is take all my data and split it up evenly
> between 3 buckets which will store the data in the state. Using the key
> above works, but splits the data unevenly between the different key groups,
> as usually the key space is very small (0 - 3). What ends up happening is
> that sometimes 50% of the keys end up on the same operator index, where
> ideally I'd like to distribute it evenly between all operator indexes in
> the cluster.
>
>
>
> Is there any way of doing this?
>
> --
>
> Best Regards,
> Yuval Itzchakov.
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


RE: Custom partitioning of keys with keyBy

2021-11-03 Thread Schwalbe Matthias
Hi Yuval,

Just a couple of comments:


  *   Assuming that all your 4 different keys are evenly distributed, and you 
send them to (only) 3 buckets, you would expect at least one bucket to cover 2 
of your keys, hence the 50%
  *   With low entropy keys avoiding data skew is quite difficult
  *   But your situation could be worse, all 4 keys could end up in the same 
bucket, if the hash function in use happens to generate collisions for the 4 
keys, in which case 2 of your 3 buckets would not process any events … this 
could also lead to watermarks not progressing …
  *   There is two proposal on how to improve the situation:
 *   Use the same parallelism and max parallelism for the relevant 
operators and implement a manual partitioner
*   A manual partitioner is also good in situations where you want to 
lower the bias and you exactly know the distribution of your key space and 
rearrange keys to even-out numbers
 *   More sophisticated (if possible), divide-and-conquer like:
*   Key by your ‘small’ key plus soma arbitrary attribute with higher 
entropy
*   Window aggregate first on that artificial key
*   Aggregate the results on your original ‘small’ key
*   This could be interesting for high-throughput situation where you 
actually want to run in parallelism higher than the number of different ‘small’ 
keys

Hope this helps

Thias


From: Yuval Itzchakov 
Sent: Mittwoch, 3. November 2021 14:41
To: user 
Subject: Custom partitioning of keys with keyBy

Hi,
I have a use-case where I'd like to partition a KeyedDataStream a bit 
differently than how Flinks default partitioning works with key groups.

[cid:image001.png@01D7D0C8.69E83060]
What I'd like to be able to do is take all my data and split it up evenly 
between 3 buckets which will store the data in the state. Using the key above 
works, but splits the data unevenly between the different key groups, as 
usually the key space is very small (0 - 3). What ends up happening is that 
sometimes 50% of the keys end up on the same operator index, where ideally I'd 
like to distribute it evenly between all operator indexes in the cluster.

Is there any way of doing this?
--
Best Regards,
Yuval Itzchakov.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.