RE: Sharding of Operators

2021-02-22 Thread Tripathi,Vikash
Thanks Chesnay, that answers my question.

In my case NextOp is operating on keyed streams and now it makes sense to me 
that along with key re-distribution, the state will also be re-distributed so 
effectively the ‘NextOp4’ instance can process all the tuples together for key 
‘A’, those that were seen earlier and even those that would be coming now.

From: Chesnay Schepler 
Sent: Monday, February 22, 2021 4:50 PM
To: Tripathi,Vikash ; yidan zhao 

Cc: user 
Subject: Re: Sharding of Operators

Let's clarify what NextOp is.
Is it operating on a keyed stream, or is something like a map function that has 
some internal state based on keys of the input elements (e.g., it has something 
like a Map that it queries/modifies for each input element)?

If NextOp operators on a keyed stream then it's (keyed) state will be 
redistributed.
If it is not a keyed state then, in your example, it will never receive another 
element with key A.

On 2/22/2021 12:07 PM, Tripathi,Vikash wrote:
Just needed more clarity in terms of a processing scenario.

Say, I was having records of key ‘A’ on a parallel instance ‘Op1’ of operator 
‘Op’ and the next operator ‘NextOp’ in the sequence of transformation was 
getting records of key ‘A’ on it’s parallel instance ‘NextOp2’ at the time when 
the savepoint was made.

Now, the application has been rescaled to a parallelism level of say 4 as 
against 2 which was the case at the time of savepoint.

Now, let’s say key ‘A’ records land up to ‘NextOp4’, parallel instance of 
‘NextOp’ operator after re-scaling but the operations being performed in this 
‘NextOp’ operator demands a windowing event based on event time processing that 
has still not been finished even after restarting the application from previous 
savepoint. Some records of the same key ‘A’ lie together to be processed in 
parallel instance ‘NextOp2’ as was the case during savepoint and the new set of 
records for the same key now happen to be redirected together, for being 
processed on the parallel instance ‘NextOp4’ of the same operator. However, to 
generate a consistent result, the event time window needs to do calculations 
that take into account both the record sets for key ‘A’ which are present on 
different instances of the same operator ‘NextOp’.

How will flink runtime handle such a situation?

From: Chesnay Schepler <mailto:ches...@apache.org>
Sent: Friday, February 19, 2021 12:52 AM
To: yidan zhao <mailto:hinobl...@gmail.com>; 
Tripathi,Vikash <mailto:vikash.tripa...@cerner.com>
Cc: user <mailto:user@flink.apache.org>
Subject: Re: Sharding of Operators

When you change the parallelism then keys are re-distributed across operators 
instances.

However, this re-distribution is limited to the set maxParallelism (set via the 
ExecutionConfig), which by default is 128 if no operators exceeded the 
parallelism on the first submission.
This cannot be changed after the job was run without discarding state.

See 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-an-explicit-max-parallelism<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.12%2Fops%2Fproduction_ready.html%23set-an-explicit-max-parallelism=04%7C01%7CVikash.Tripathi%40cerner.com%7C170f1dc5e6894074a60108d8d723d3c7%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637495896165013471%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=2wMmqI1E15z2YgVMBB1YY0bN3wgz4cSu70y%2F3BntSYk%3D=0>

On 2/18/2021 8:29 AM, yidan zhao wrote:
Actually, we only need to ensure all records belonging to the same key will be 
forwarded to the same operator instance(i), and we do not need to guarantee 
that 'i' is the same with the 'i' in previous savepoints. When the job is 
restarted, the rule 'same key's record will be in together' is guaranteed and 
more slots will be surely useful, since each slot(operator instance) will be 
responsible for less keys, leading to less records.

Tripathi,Vikash mailto:vikash.tripa...@cerner.com>> 
于2021年2月18日周四 上午12:09写道:
Hi there,

I wanted to know how re-partitioning of keys per operator instance would happen 
when the current operator instances are scaled up or down and we are restarting 
our job from a previous savepoint which had a different number of parallel 
instances of the same operator.

My main concern is whether the re-distribution would lead to mapping of same 
keys to same operator instances as was done earlier but if this happens then 
there would be no added advantage of adding new task slots for the same 
operator because they would remain less used or not used at all if all possible 
key values have been seen earlier and if we go by the other way around of 
evenly distributing out keys (based on the hash function) to the new parallel 
slots as well, won't this cause issues in terms of processing consistent 
results based on the state of 

Re: Sharding of Operators

2021-02-22 Thread Chesnay Schepler

Let's clarify what NextOp is.
Is it operating on a keyed stream, or is something like a map function 
that has some internal state based on keys of the input elements (e.g., 
it has something like a Map that it queries/modifies for 
each input element)?


If NextOp operators on a keyed stream then it's (keyed) state will be 
redistributed.
If it is not a keyed state then, in your example, it will never receive 
another element with key A.


On 2/22/2021 12:07 PM, Tripathi,Vikash wrote:


Just needed more clarity in terms of a processing scenario.

Say, I was having records of key ‘A’ on a parallel instance ‘Op1’ of 
operator ‘Op’ and the next operator ‘NextOp’ in the sequence of 
transformation was getting records of key ‘A’ on it’s parallel 
instance ‘NextOp2’ at the time when the savepoint was made.


Now, the application has been rescaled to a parallelism level of say 4 
as against 2 which was the case at the time of savepoint.


Now, let’s say key ‘A’ records land up to ‘NextOp4’, parallel instance 
of ‘NextOp’ operator after re-scaling but the operations being 
performed in this ‘NextOp’ operator demands a windowing event based on 
event time processing that has still not been finished even after 
restarting the application from previous savepoint. Some records of 
the same key ‘A’ lie together to be processed in parallel instance 
‘NextOp2’ as was the case during savepoint and the new set of records 
for the same key now happen to be redirected together, for being 
processed on the parallel instance ‘NextOp4’ of the same operator. 
However, to generate a consistent result, the event time window needs 
to do calculations that take into account both the record sets for key 
‘A’ which are present on different instances of the same operator 
‘NextOp’.


How will flink runtime handle such a situation?

*From:*Chesnay Schepler 
*Sent:* Friday, February 19, 2021 12:52 AM
*To:* yidan zhao ; Tripathi,Vikash 


*Cc:* user 
*Subject:* Re: Sharding of Operators

When you change the parallelism then keys are re-distributed across 
operators instances.


/However/, this re-distribution is limited to the set /maxParallelism 
/(set via the ExecutionConfig), which by default is 128 if no 
operators exceeded the parallelism on the first submission.


This *cannot be changed* after the job was run without discarding state.

See 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-an-explicit-max-parallelism 
<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.12%2Fops%2Fproduction_ready.html%23set-an-explicit-max-parallelism=04%7C01%7CVikash.Tripathi%40cerner.com%7C2cd9743df82c4ece5a1608d8d44280de%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637492729385864292%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=gR3eEI3KVcKcgDXTYYx5OveHP1fLk2LJmjxWeAsoxbs%3D=0>


On 2/18/2021 8:29 AM, yidan zhao wrote:

Actually, we only need to ensure all records belonging to the same
key will be forwarded to the same operator instance(i), and we do
not need to guarantee that 'i' is the same with the 'i' in
previous savepoints. When the job is restarted, the rule 'same
key's record will be in together' is guaranteed and more slots
will be surely useful, since each slot(operator instance) will be
responsible for less keys, leading to less records.

Tripathi,Vikash mailto:vikash.tripa...@cerner.com>> 于2021年2月18日周四
上午12:09写道:

Hi there,

I wanted to know how re-partitioning of keys per operator
instance would happen when the current operator instances are
scaled up or down and we are restarting our job from a
previous savepoint which had a different number of parallel
instances of the same operator.

My main concern is whether the re-distribution would lead to
mapping of same keys to same operator instances as was done
earlier but if this happens then there would be no added
advantage of adding new task slots for the same operator
because they would remain less used or not used at all if all
possible key values have been seen earlier and if we go by the
other way around of evenly distributing out keys (based on the
hash function) to the new parallel slots as well, won't this
cause issues in terms of processing consistent results based
on the state of operator as was provided by previous savepoint
of application.

Is there a guarantee given by the hash function as in attached
snippet, that same keys which landed earlier on an operator
instance will land back again to the same operator instance
once the job is restarted with new set of parallelism
configuration?

Thanks,

Vikash

CONFIDENTIALITY NOTICE This message and any included
   

RE: Sharding of Operators

2021-02-22 Thread Tripathi,Vikash
Just needed more clarity in terms of a processing scenario.

Say, I was having records of key ‘A’ on a parallel instance ‘Op1’ of operator 
‘Op’ and the next operator ‘NextOp’ in the sequence of transformation was 
getting records of key ‘A’ on it’s parallel instance ‘NextOp2’ at the time when 
the savepoint was made.

Now, the application has been rescaled to a parallelism level of say 4 as 
against 2 which was the case at the time of savepoint.

Now, let’s say key ‘A’ records land up to ‘NextOp4’, parallel instance of 
‘NextOp’ operator after re-scaling but the operations being performed in this 
‘NextOp’ operator demands a windowing event based on event time processing that 
has still not been finished even after restarting the application from previous 
savepoint. Some records of the same key ‘A’ lie together to be processed in 
parallel instance ‘NextOp2’ as was the case during savepoint and the new set of 
records for the same key now happen to be redirected together, for being 
processed on the parallel instance ‘NextOp4’ of the same operator. However, to 
generate a consistent result, the event time window needs to do calculations 
that take into account both the record sets for key ‘A’ which are present on 
different instances of the same operator ‘NextOp’.

How will flink runtime handle such a situation?

From: Chesnay Schepler 
Sent: Friday, February 19, 2021 12:52 AM
To: yidan zhao ; Tripathi,Vikash 

Cc: user 
Subject: Re: Sharding of Operators

When you change the parallelism then keys are re-distributed across operators 
instances.

However, this re-distribution is limited to the set maxParallelism (set via the 
ExecutionConfig), which by default is 128 if no operators exceeded the 
parallelism on the first submission.
This cannot be changed after the job was run without discarding state.

See 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-an-explicit-max-parallelism<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.12%2Fops%2Fproduction_ready.html%23set-an-explicit-max-parallelism=04%7C01%7CVikash.Tripathi%40cerner.com%7C2cd9743df82c4ece5a1608d8d44280de%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637492729385864292%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=gR3eEI3KVcKcgDXTYYx5OveHP1fLk2LJmjxWeAsoxbs%3D=0>

On 2/18/2021 8:29 AM, yidan zhao wrote:
Actually, we only need to ensure all records belonging to the same key will be 
forwarded to the same operator instance(i), and we do not need to guarantee 
that 'i' is the same with the 'i' in previous savepoints. When the job is 
restarted, the rule 'same key's record will be in together' is guaranteed and 
more slots will be surely useful, since each slot(operator instance) will be 
responsible for less keys, leading to less records.

Tripathi,Vikash mailto:vikash.tripa...@cerner.com>> 
于2021年2月18日周四 上午12:09写道:
Hi there,

I wanted to know how re-partitioning of keys per operator instance would happen 
when the current operator instances are scaled up or down and we are restarting 
our job from a previous savepoint which had a different number of parallel 
instances of the same operator.

My main concern is whether the re-distribution would lead to mapping of same 
keys to same operator instances as was done earlier but if this happens then 
there would be no added advantage of adding new task slots for the same 
operator because they would remain less used or not used at all if all possible 
key values have been seen earlier and if we go by the other way around of 
evenly distributing out keys (based on the hash function) to the new parallel 
slots as well, won't this cause issues in terms of processing consistent 
results based on the state of operator as was provided by previous savepoint of 
application.

Is there a guarantee given by the hash function as in attached snippet, that 
same keys which landed earlier on an operator instance will land back again to 
the same operator instance once the job is restarted with new set of 
parallelism configuration?

Thanks,
Vikash



CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.




Re: Sharding of Operators

2021-02-18 Thread Chesnay Schepler
When you change the parallelism then keys are re-distributed across 
operators instances.


/However/, this re-distribution is limited to the set /maxParallelism 
/(set via the ExecutionConfig), which by default is 128 if no operators 
exceeded the parallelism on the first submission.

This *cannot be changed* after the job was run without discarding state.

See 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-an-explicit-max-parallelism


On 2/18/2021 8:29 AM, yidan zhao wrote:
Actually, we only need to ensure all records belonging to the same key 
will be forwarded to the same operator instance(i), and we do not need 
to guarantee that 'i' is the same with the 'i' in previous savepoints. 
When the job is restarted, the rule 'same key's record will be in 
together' is guaranteed and more slots will be surely useful, since 
each slot(operator instance) will be responsible for less keys, 
leading to less records.


Tripathi,Vikash > 于2021年2月18日周四 上午12:09写道:


Hi there,

I wanted to know how re-partitioning of keys per operator instance
would happen when the current operator instances are scaled up or
down and we are restarting our job from a previous savepoint which
had a different number of parallel instances of the same operator.

My main concern is whether the re-distribution would lead to
mapping of same keys to same operator instances as was done
earlier but if this happens then there would be no added advantage
of adding new task slots for the same operator because they would
remain less used or not used at all if all possible key values
have been seen earlier and if we go by the other way around of
evenly distributing out keys (based on the hash function) to the
new parallel slots as well, won't this cause issues in terms of
processing consistent results based on the state of operator as
was provided by previous savepoint of application.

Is there a guarantee given by the hash function as in attached
snippet, that same keys which landed earlier on an operator
instance will land back again to the same operator instance once
the job is restarted with new set of parallelism configuration?

Thanks,

Vikash

CONFIDENTIALITY NOTICE This message and any included attachments
are from Cerner Corporation and are intended only for the
addressee. The information contained in this message is
confidential and may constitute inside or non-public information
under international, federal, or state securities laws.
Unauthorized forwarding, printing, copying, distribution, or use
of such information is strictly prohibited and may be unlawful. If
you are not the addressee, please promptly delete this message and
notify the sender of the delivery error by e-mail or you may call
Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
(816)221-1024.





Re: Sharding of Operators

2021-02-17 Thread yidan zhao
Actually, we only need to ensure all records belonging to the same key will
be forwarded to the same operator instance(i), and we do not need to
guarantee that 'i' is the same with the 'i' in previous savepoints. When
the job is restarted, the rule 'same key's record will be in together' is
guaranteed and more slots will be surely useful, since each slot(operator
instance) will be responsible for less keys, leading to less records.

Tripathi,Vikash  于2021年2月18日周四 上午12:09写道:

> Hi there,
>
>
>
> I wanted to know how re-partitioning of keys per operator instance would
> happen when the current operator instances are scaled up or down and we are
> restarting our job from a previous savepoint which had a different number
> of parallel instances of the same operator.
>
>
>
> My main concern is whether the re-distribution would lead to mapping of
> same keys to same operator instances as was done earlier but if this
> happens then there would be no added advantage of adding new task slots for
> the same operator because they would remain less used or not used at all if
> all possible key values have been seen earlier and if we go by the other
> way around of evenly distributing out keys (based on the hash function) to
> the new parallel slots as well, won't this cause issues in terms of
> processing consistent results based on the state of operator as was
> provided by previous savepoint of application.
>
>
>
> Is there a guarantee given by the hash function as in attached snippet,
> that same keys which landed earlier on an operator instance will land back
> again to the same operator instance once the job is restarted with new set
> of parallelism configuration?
>
>
>
> Thanks,
>
> Vikash
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>