[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 11:39 PM:
-----------------------------------------------------------------

[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object 
with_  _ENTROPY___KEY__ substituted with a 4-char random hex. 

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml). 

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object 
with_  _ENTROPY___KEY__ substituted with a 4-char random hex. 

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml). For the hash approach, maybe it should hash(base_path, 
operator_id)?

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-9061
>                 URL: https://issues.apache.org/jira/browse/FLINK-9061
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystem, State Backends, Checkpointing
>    Affects Versions: 1.4.2
>            Reporter: Jamie Grier
>            Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to