[ 
https://issues.apache.org/jira/browse/FLINK-19255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Bendickson updated FLINK-19255:
------------------------------------
    Description: 
Currently, we no longer chain Async IO calls. Instead, anything using AsyncIO 
starts the new head of an operator chain as a temporary workaround for this 
issue: https://issues.apache.org/jira/browse/FLINK-13063

 

However, because this change can (and does in my customers' cases) have very 
large impact on the job graph size, and because people were previously 
accepting of their results, in the 1.10 release it was made so that 
AsyncWaitOperator could be chained in this issue 
https://issues.apache.org/jira/browse/FLINK-16219.

 

However, it's very complicated and not intuitive for users to call out to 
operator factory methods. I have users who would very much like to not have 
their AsyncIO calls generate a new chain, as it's ballooned the number of state 
stores they have and they were accepting of their previous results. The only 
exmaple I could find was in the tests, and its rather convoluted.

 

My proposal would be to add that config check just before the line of code in 
AsyncWaitOperator.java that would not add the following line, which is 
currently hardcoded into the operator and what requires one to use the operator 
factory:
{noformat}
setChainingStrategy(ChainingStrategy.ALWAYS){noformat}
 

[https://github.com/apache/flink/pull/11177/files#diff-00b95aec1fa804a531b553610ec74c27R117]

 

Given that this is considered potentially unsafe / legacy behavior, I would 
suggest that we add a config, something that explicitly calls this out as 
unsafe / legacy, so that users do not have to go through the unintuitive 
process of using operator factories but that new users know not to use this 
option or to use it at their own risk. We could also document that it is not 
necessarily going to be supported in the future if need be.

 

My suggestion for config names that would avoid that setChainingStrategy line 
include
{noformat}
taskmanager.async-io-operator.legacy-unsafe-chaining-strategy{noformat}
which specifically calls this behavior out as legacy and unsafe.

 

Another possible name could be
{noformat}
pipeline.operator-chaining.async-io.legacy-inconsistent-chaining-strategy-always{noformat}
(which would be more in line with the existing config of 
pipeline.operator-chaining).

 

 

Given that it is possible to stop operator chaining, it's just very unintuitive 
and requires using operator factories, I think that this configuration would be 
a good addition. I would be happy to submit a PR, with tests, and updated 
documentation, so that power users who are looking to do this could enable / 
disable this behavior without having to change their code much.

 

I recognize that this might be an odd request as this has been deemed unsafe, 
but this change has made it very difficult for some of my users to use rocksdb, 
namely those with very large state that previously made very liberal use of 
Async IO (especially for things like analytics events which can be sent on a 
best effort basis) and who therefore have a very large job graph after this 
change.

 

If anybody has any better suggestions for names, I'd be open to them. And then 
as mentioned, I'd be happy to submit a PR with tests etc.

 

For reference, here are the tests where I found the ability to use the operator 
factory and here is the utility function which is needed to create a chained 
async io operator vertex. Note that this utility function is in the test and 
not part of the public facing API. 
[https://github.com/apache/flink/blob/3a04e179e09224b09c4ee656d31558844da83a26/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java#L880-L912]

If there is a simpler way to handle this, I'd be happy to hear it. Otherwise, 
since this behavior is technically already specifically enabled (as called out 
in the changelog from Flink 1.11), I think it makes sense to add a config and 
either document that its legacy behavior, unsafe (or inconsistent, up to you), 
and that it could go away at any time.

 

But it seems unnecessary to require users to go through so many extra hoops in 
the code, especially for users who share operators amongst different jobs which 
might be configured to use different state backends. Not to mention that some 
of these users want the legacy behavior and others would prefer to play it safe 
and accept the additional shuffle, so a code fix is not always feasible when 
code is shared, but the enabling / disabling of a cluster level config would 
still allow for shared code.

 

I'd be happy to submit a patch once this issue is discussed.

 

Thank you,

Kyle B. - Data Services @ Tinder 

  was:
Currently, we no longer chain AsyncIO calls. Instead, anything using AsyncIO 
starts the new head of an operator chain as a temporary workaround for this 
issue: https://issues.apache.org/jira/browse/FLINK-13063

 

However, because this change can (and does in my customers' cases) have very 
large impact on the job graph size, and because people were previously 
accepting of their results, in the 1.10 release it was made so that 
AsyncWaitOperator could be chained in this issue 
https://issues.apache.org/jira/browse/FLINK-16219.

 

However, it's very complicated and not intuitive for users to call out to 
operator factory methods. I have users who would very much like to not have 
their AsyncIO calls generate a new chain, as it's ballooned the number of state 
stores they have and they were accepting of their previous results. The only 
exmaple I could find was in the tests, and its rather convoluted.

 

My proposal would be to add that config check just before the line of code in 
AsyncWaitOperator.java that would not add the following line, which is 
currently hardcoded into the operator and what requires one to use the operator 
factory:
{noformat}
setChainingStrategy(ChainingStrategy.ALWAYS){noformat}
 

[https://github.com/apache/flink/pull/11177/files#diff-00b95aec1fa804a531b553610ec74c27R117]

 

Given that this is considered potentially unsafe / legacy behavior, I would 
suggest that we add a config, something that explicitly calls this out as 
unsafe / legacy, so that users do not have to go through the unintuitive 
process of using operator factories but that new users know not to use this 
option or to use it at their own risk. We could also document that it is not 
necessarily going to be supported in the future if need be.

 

My suggestion for config names that would avoid that setChainingStrategy line 
include
{noformat}
taskmanager.async-io-operator.legacy-unsafe-chaining-strategy{noformat}
which specifically calls this behavior out as legacy and unsafe.

 

Another possible name could be
{noformat}
pipeline.operator-chaining.async-io.legacy-inconsistent-chaining-strategy-always{noformat}
(which would be more in line with the existing config of 
pipeline.operator-chaining).

 

 

Given that it is possible to stop operator chaining, it's just very unintuitive 
and requires using operator factories, I think that this configuration would be 
a good addition. I would be happy to submit a PR, with tests, and updated 
documentation, so that power users who are looking to do this could enable / 
disable this behavior without having to change their code much.

 

I recognize that this might be an odd request as this has been deemed unsafe, 
but this change has made it very difficult for some of my users to use rocksdb, 
namely those with very large state that previously made very liberal use of 
AsyncIO (especially for things like analytics events which can be sent on a 
best effort basis) and who therefore have a very large job graph after this 
change.

 

If anybody has any better suggestions for names, I'd be open to them. And then 
as mentioned, I'd be happy to submit a PR with tests etc.

 

For reference, here are the tests where I found the ability to use the operator 
factory and here is the utility function which is needed to create a chained 
async io operator vertex. Note that this utility function is in the test and 
not part of the public facing API. 
[https://github.com/apache/flink/blob/3a04e179e09224b09c4ee656d31558844da83a26/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java#L880-L912]

If there is a simpler way to handle this, I'd be happy to hear it. Otherwise, 
since this behavior is technically already specifically enabled (as called out 
in the changelog from Flink 1.11), I think it makes sense to add a config and 
either document that its legacy behavior, unsafe (or inconsistent, up to you), 
and that it could go away at any time.

 

But it seems unnecessary to require users to go through so many extra hoops in 
the code, especially for users who share operators amongst different jobs which 
might be configured to use different state backends. Not to mention that some 
of these users want the legacy behavior and others would prefer to play it safe 
and accept the additional shuffle, so a code fix is not always feasible when 
code is shared, but the enabling / disabling of a cluster level config would 
still allow for shared code.

 

I'd be happy to submit a patch once this issue is discussed.

 

Thank you,

Kyle B. - Data Services @ Tinder 


> Add configuration to make AsyncWaitOperation Chainable
> ------------------------------------------------------
>
>                 Key: FLINK-19255
>                 URL: https://issues.apache.org/jira/browse/FLINK-19255
>             Project: Flink
>          Issue Type: Task
>          Components: API / Core
>    Affects Versions: 1.10.2, 1.11.2
>         Environment: Any flink job using Async IO post this PR: 
> [https://github.com/apache/flink/pull/11177/files#diff-00b95aec1fa804a531b553610ec74c27R117]
> (so I believe anything starting at either 1.9 or 1.10).
>  
>            Reporter: Kyle Bendickson
>            Priority: Major
>
> Currently, we no longer chain Async IO calls. Instead, anything using AsyncIO 
> starts the new head of an operator chain as a temporary workaround for this 
> issue: https://issues.apache.org/jira/browse/FLINK-13063
>  
> However, because this change can (and does in my customers' cases) have very 
> large impact on the job graph size, and because people were previously 
> accepting of their results, in the 1.10 release it was made so that 
> AsyncWaitOperator could be chained in this issue 
> https://issues.apache.org/jira/browse/FLINK-16219.
>  
> However, it's very complicated and not intuitive for users to call out to 
> operator factory methods. I have users who would very much like to not have 
> their AsyncIO calls generate a new chain, as it's ballooned the number of 
> state stores they have and they were accepting of their previous results. The 
> only exmaple I could find was in the tests, and its rather convoluted.
>  
> My proposal would be to add that config check just before the line of code in 
> AsyncWaitOperator.java that would not add the following line, which is 
> currently hardcoded into the operator and what requires one to use the 
> operator factory:
> {noformat}
> setChainingStrategy(ChainingStrategy.ALWAYS){noformat}
>  
> [https://github.com/apache/flink/pull/11177/files#diff-00b95aec1fa804a531b553610ec74c27R117]
>  
> Given that this is considered potentially unsafe / legacy behavior, I would 
> suggest that we add a config, something that explicitly calls this out as 
> unsafe / legacy, so that users do not have to go through the unintuitive 
> process of using operator factories but that new users know not to use this 
> option or to use it at their own risk. We could also document that it is not 
> necessarily going to be supported in the future if need be.
>  
> My suggestion for config names that would avoid that setChainingStrategy line 
> include
> {noformat}
> taskmanager.async-io-operator.legacy-unsafe-chaining-strategy{noformat}
> which specifically calls this behavior out as legacy and unsafe.
>  
> Another possible name could be
> {noformat}
> pipeline.operator-chaining.async-io.legacy-inconsistent-chaining-strategy-always{noformat}
> (which would be more in line with the existing config of 
> pipeline.operator-chaining).
>  
>  
> Given that it is possible to stop operator chaining, it's just very 
> unintuitive and requires using operator factories, I think that this 
> configuration would be a good addition. I would be happy to submit a PR, with 
> tests, and updated documentation, so that power users who are looking to do 
> this could enable / disable this behavior without having to change their code 
> much.
>  
> I recognize that this might be an odd request as this has been deemed unsafe, 
> but this change has made it very difficult for some of my users to use 
> rocksdb, namely those with very large state that previously made very liberal 
> use of Async IO (especially for things like analytics events which can be 
> sent on a best effort basis) and who therefore have a very large job graph 
> after this change.
>  
> If anybody has any better suggestions for names, I'd be open to them. And 
> then as mentioned, I'd be happy to submit a PR with tests etc.
>  
> For reference, here are the tests where I found the ability to use the 
> operator factory and here is the utility function which is needed to create a 
> chained async io operator vertex. Note that this utility function is in the 
> test and not part of the public facing API. 
> [https://github.com/apache/flink/blob/3a04e179e09224b09c4ee656d31558844da83a26/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java#L880-L912]
> If there is a simpler way to handle this, I'd be happy to hear it. Otherwise, 
> since this behavior is technically already specifically enabled (as called 
> out in the changelog from Flink 1.11), I think it makes sense to add a config 
> and either document that its legacy behavior, unsafe (or inconsistent, up to 
> you), and that it could go away at any time.
>  
> But it seems unnecessary to require users to go through so many extra hoops 
> in the code, especially for users who share operators amongst different jobs 
> which might be configured to use different state backends. Not to mention 
> that some of these users want the legacy behavior and others would prefer to 
> play it safe and accept the additional shuffle, so a code fix is not always 
> feasible when code is shared, but the enabling / disabling of a cluster level 
> config would still allow for shared code.
>  
> I'd be happy to submit a patch once this issue is discussed.
>  
> Thank you,
> Kyle B. - Data Services @ Tinder 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to