[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426132#comment-16426132
 ] 

Jamie Grier commented on FLINK-9061:


Yup, sounds good to me :)

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-04 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426124#comment-16426124
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

[~jgrier] Amazon doesn't want to reveal internal details, hence sometimes are 
pretty vague. Mu understanding is that prefix (before the random/entropy part) 
has to be fixed. Either way, latest proposal doesn't prevent user from setting 
the first a few chars as random/entropy part. 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426103#comment-16426103
 ] 

Jamie Grier commented on FLINK-9061:


Okay, this is the best documentation I've found on this:  
[https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html]
 and even it is very vague.

It does appear that it doesn't have to be the very first characters but it 
brings up an interesting question.  What are the exact constraints here?  Which 
part of the key name is and isn't used for partitioning exactly?  I mean 
technically all of our checkpoint objects do in fact have several characters of 
uniqueness since the last part of the full object key name is the GUID.

Anyway, not having full info sucks.

[~stevenz3wu] I think your proposal sounds good.  Thanks for offering to do the 
PR :)  That should work well and logical listing of sub-directories should 
still be possible in this scheme by issuing parallel s3 list requests for each 
possible prefix and merging the results.

Shall we proceed with this approach then?

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hexes for entropy. hash should 
work equally well. I am not strongly biased either way, even though I don't see 
much benefit of hash over random. deterministic hash doesn't seem to give much 
benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424552#comment-16424552
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

it seems that S3 walk through the prefix from left to right until it finds some 
randomness for partitioning. it is more sophisticated than just first a few 
chars.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423335#comment-16423335
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

I don't know if it has to be "the very first characters". Amazon didn't 
explicitly say it. Let me see if I can confirm it.

I still like the flexibility of giving user the control on key names (including 
entropy part).

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423089#comment-16423089
 ] 

Jamie Grier commented on FLINK-9061:


As I understand it the above doesn't work – maybe if you ask Amazon to set up 
buckets for you manually this could be made to work, but I think in the normal 
case the very first characters of the key name must introduce the randomness.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423080#comment-16423080
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

reversing the components (split by slash char) doesn't give user the control on 
key names. e.g. User might want to keep the top level (first component) clean.
s3://bucket/checkpoints/...s3://bucket/savepoints/...

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423072#comment-16423072
 ] 

Jamie Grier commented on FLINK-9061:


So, what I'm suggesting is that we, optionally, split on '/' and reverse the 
components like so:

s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789

becomes

s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints

It's not very ops friendly but that's because S3 isn't a filesystem.  The 
hierarchy isn't real.  The equivalent of a directory listing that would group 
all the checkpoints for a single job would be:

aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints"

I think that would work just fine for our use case.  What about others?

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422976#comment-16422976
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

I think S3 has more sophisticated pattern searching for entropy (not just from 
the absolutely beginning). we have asked S3 team to  shard the bucket with key 
names like "checkpoints/*/*" or 
"checkpoints/*-*" 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422890#comment-16422890
 ] 

Greg Hogan commented on FLINK-9061:
---

Since S3 key names are opaque it sounds like any "prefix" is counted for the 
initial entropy (so "c/" instead of "checkpoints/"?).

This doesn't seem very ops-friendly if various points from various jobs are now 
intermixed by hash.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422843#comment-16422843
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

Usually 4-char random prefix can go a long way. Even 2-char random prefix can 
be sufficient unless super high workload. Again, as Steve Loughran, nothing 
official. just based on experience and speculation. But I think we should give 
user the control the number of characters for the entropy part.

[~jgrier]  flink checkpoint path is like 
"s3://bucket/checkpoint-path-prefix/chk-121/random-UUID". So yes, reversing key 
name can work. But I think we should give user the control on the part of 
checkpoint path to introduce entropy. e.g. I may want to maintain the top level 
prefixes (checkpoints and savepoints)
{code:java}
s3://bucket/checkpoints//rest-of-checkpoint-path
s3://bucket/savepoints//rest-of-savepointpoint-path{code}
 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422645#comment-16422645
 ] 

Steve Loughran commented on FLINK-9061:
---

something less than 8, maybe 5, though it's mostly all superstition

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422534#comment-16422534
 ] 

Stephan Ewen commented on FLINK-9061:
-

Do we know how many characters are used for the partitioning? The last 
directory segment starts with "{{chk-}}" for all checkpoints.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-30 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420885#comment-16420885
 ] 

Jamie Grier commented on FLINK-9061:


Maybe we should keep this super simple and make a change at the state backend 
level where we optionally just reverse the path.  That should actually work 
very well.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-28 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417964#comment-16417964
 ] 

Steve Loughran commented on FLINK-9061:
---

[~greghogan]

I cut the link as it was just a duplicate of the one in the header.

your link is new; something I think I'd seen somewhere else too. It's 
unfortunate that most of our knowledge here is superstition and stack traces, 
but I think that's a deliberate attempt to avoid making any commitment about 
future behaviour. 

 

Here's my beliefs

#.Once you write enough data down a path, a partition is somehow triggered, and 
data split across s3 shards
# that partitioning event is counted as part of your load for the bucket/the IO 
which a partitioning path can sustain is reduced
# so the overall IO rate at that point drops. Maybe that's raising the 500 error

I don't think it partitions based purely on load. That would be fun given you 
can just issue many delete requests to a path and get throttled.


> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-28 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417402#comment-16417402
 ] 

Greg Hogan commented on FLINK-9061:
---

[~ste...@apache.org], not sure why your 
[link|https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/]
 isn't showing in your comment. I went looking for more info and found 
[this|https://aws.amazon.com/blogs/aws/amazon-s3-performance-tips-tricks-seattle-hiring-event/]
 explanation for the need to put entropy in the short prefix of the object name:

"In fact, S3 even has an algorithm to detect this parallel type of write 
pattern and will automatically create multiple child partitions from the same 
parent simultaneously – increasing the system’s operations per second budget as 
request heat is detected."

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-28 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417145#comment-16417145
 ] 

Steve Loughran commented on FLINK-9061:
---

The s3a connector will have the same issue, though there we can change the 
backoff policy, so you shouldn't see it other than an increase in the throttle 
event metrics. But checkpointing will slow down

Putting the most randomness at the head of the path is the one that s3 likes 
best, as its partitioning is based on the first few characters. And I mean 
"really at the head of the path" : 
https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/



> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-28 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417032#comment-16417032
 ] 

Stephan Ewen commented on FLINK-9061:
-

One advantage of making the changes in the state backend would be that you can 
add entropy to the state files written by the TaskManager, but leave the 
metadata file path as is (easier to find when trying to resume manually from a 
savepoint or externalized checkpoint).

You can also try as a temporary workaround the connection limiting feature of 
the Flink FileSystems: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/filesystems.html#connection-limiting

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-27 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416271#comment-16416271
 ] 

Jamie Grier commented on FLINK-9061:


[~StephanEwen] I don't know if the s3a-based connector exhibits the same 
behavior but I suspect it would since this is not a great approach to writing 
checkpoint data to S3 ;) . We could improve on this by changing the retry 
policy in use as [~ste...@apache.org] said but in the end it seems like we'll 
have to introduce the entropy to really solve the problem.

Any further ideas about which approach might be best?  We could make changes at 
the FileSystem or StateBackend level.  And of course it would make listing 
files hard.  Maybe we could use just two digits for the entropy, like 00-99, 
and when listing we could just list all of those and merge the results..

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-27 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416254#comment-16416254
 ] 

Jamie Grier commented on FLINK-9061:


Yeah, so I completely agree that should be a 503 but it's not.  I already ran 
this through the AWS channels.  The response was essentially that this was 
"internally" a TooBusyException.  Here's their full response:
{quote}Based on the information provided, I understand that you are 
experiencing some internal errors (status code 500) from S3, which is impacting 
one of your Flink jobs. From the log dive on your provided request IDs, I 
observe that your PutObject request triggered Internal Error with 
TooBusyException. This happens when a bucket receives more requests than it can 
handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests 
per second or more than 300 GET requests per second. So, if your workload is to 
exceed this limit, you'd need to scale your bucket through partitioning. 
Currently, your key space isn't randomized and all your keys include 
"BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". 
Therefore, your bucket isn't being automatically partitioned by S3 and you 
received increased error rates after your requests increased.
{quote}
 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415501#comment-16415501
 ] 

Steve Loughran commented on FLINK-9061:
---

[~StephanEwen]: I knew that, but it's still the same AWS SDK underneath.

500 is not normal throttling; that's 503.

[~jgrier]: What you are seeing here is something has gone wrong in S3 itself. 
Usually transient, treat as retriable on all requests.

S3A on Hadoop 3.1+ will treat as a connectivity error and use whatever settings 
you use there (retryUpToMaximumCountWithFixedSleep policy). If a 500 can be 
caused by overload, that could/should be switched to the exponential backoff 
policy as per 503 events.
 # file a support request with the AWS team including the request ID of a 
failing request
 # add a followup here listing what they said/recommended

Obviously, I can't fix the stack trace here, but we can at least change the S3A 
connector to see this and retry appropriately.

Thank you for finding another interesting failure mode of S3 itself :)

+[~fabbri]

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-27 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415368#comment-16415368
 ] 

Stephan Ewen commented on FLINK-9061:
-

[~ste...@apache.org] I think they are not using the hadoop-based connector.
[~jgrier] Does the s3a-based connector exhibit the same behavior?

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414828#comment-16414828
 ] 

Jamie Grier commented on FLINK-9061:


[~ste...@apache.org]

Here's the full stack trace:

 

 

 

"java.lang.Exception: Could not perform checkpoint 465 for operator 
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185)
 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 465 for operator 
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560)
 ... 8 common frames omitted
Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
s3://BUCKET/SERVICE/flink/checkpoints/fda9a54de951d8cf7a55b5cd833cb0f7/chk-465/02896cca-0d9c-4295-9d30-b0a3b7cc928b
 in order to obtain the stream state handle
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126)
 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:359)
 ... 13 common frames omitted
Caused by: java.io.IOException: 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 We encountered an internal error. Please try again. (Service: Amazon S3; 
Status Code: 500; Error Code: InternalError; Request ID: BLAH), S3 Extended 
Request ID: BLAH_
 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1036)
 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:987)
 
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
 
org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
 ... 18 common frames omitted
Caused by: 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 We encountered an internal error. Please try again. (Service: Amazon S3; 
Status Code: 500; Error Code: InternalError; Request ID: BLAH)
 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029)
 

[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414696#comment-16414696
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

It seems that our internal change *only* works with internal checkpoint. It 
doesn't work for external checkpoint and savepoint. I know Flink community is 
also thinking about merging internal/external checkpoint and savepoint. A more 
general discussion may be needed. 

Ideally, what I would like to see for external checkpoint and savepoint.
 * uber metadata file (written by jobmanager) keep the same base path (without 
random/hash entropy). this is needed to help with implementing retention policy 
by prefix query (s3 list). this is not an issue for internal checkpoint, 
because zookeeper gives us such root reference even with entropy substitution.
 * actual data files written by operators from task manager needs to have 
entropy in key name to spread the writes to different S3 partitions.

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414280#comment-16414280
 ] 

Steve Loughran commented on FLINK-9061:
---

you can get it on delete requests too, if you try hard.

Jamie, can I see a full stack trace? I'm curious now as generally the s3 
clients should be retrying it, you'll see a slowdown rather than failure.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. I can see that 
_hash_ approach can work for both scenario.
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state and thousands of operators. To avoid 
throttling, we need S3 writes *from many operators* to spread to different S3 
partitions. that's where we want to inject *dynamic* 4-char random hex for each 
S3 write.

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413628#comment-16413628
 ] 

Stephan Ewen commented on FLINK-9061:
-

That would be a great contribution, valuable for many S3 users. Indeed, S3's 
partitioning by prefix 

FYI: The 1.5 release will have a few changes to the state backends that will 
reduce some S3 requests (mainly HEAD, if you use s3a).
The here proposed is still needed, though.

We could approach this in two ways:

  1. Make the change to the state backend itself, to allow it to insert entropy 
into the paths. We can think about only doing this for checkpoints, but not for 
savepoints, because then savepoints would still be together in "one directory" 
(under the same prefix).

  2. Make this a general change in the S3 filesystem, to use a deterministic 
path hash like Jamie suggested. That would help to solve this across other use 
cases of S3 as well. Whenever the path contains the segment *_hash_* this would 
be replaced by such a hash.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-24 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412814#comment-16412814
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

[~jgrier] Yes, we want to contribute this back. We can probably partner with 
you to get the change upstream.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-24 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412737#comment-16412737
 ] 

Jamie Grier commented on FLINK-9061:


[~stevenz3wu] Did you contribute those changes back to Flink?  I think this 
will be affecting others as well.  Would you consider a contribution back to 
the project?  Otherwise I will do this but if you already have something 
working we might as well use it or base changes on it.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-23 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412326#comment-16412326
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

Jamie,

yes, we run into the same issue at Netflix. We did exactly like what you said. 
Here are our config looks like.

state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints
state.backend.fs.checkpointdir.injectEntropy.enabled: true
state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__

we modified state backend code to support it. Without the random chars in the 
path, there is no way for S3 to partition the bucket to support high request 
rate. I don't see other way around it. There is obviously down side with such 
random chars in s3 path. now you can't do prefix listing anymore.

 

Steven

 

 

 

 

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)