[jira] [Commented] (FLINK-9061) Add entropy to s3 path for better scalability

2018-10-29 Thread Jonathan Miles (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667474#comment-16667474
 ] 

Jonathan Miles commented on FLINK-9061:
---

Agreed it should be harder, but we were able to trigger throttling while 
testing 1.4.2 around mid-August, after that announcement was made. We had 
around 10 jobs checkpointing to the same bucket and a different prefix for each 
job, something like 400 Task Managers. I know there was some work done to 
reduce the number of S3 requests made and combined with this "prefix entropy" 
change we haven't seen it happen again.

It might be useful to add [your 
link|https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/]
 to the documentation.

> Add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2, 1.5.0
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.2, 1.7.0
>
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-08-14 Thread Jonathan Miles (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579950#comment-16579950
 ] 

Jonathan Miles commented on FLINK-9061:
---

Is the PR still being considered for merging? We seem to be running into the 
throttling issue with S3 and Flink 1.4.2 and a large stateful job.

I'll come back with more information. I only just started the investigation and 
came across this ticket.

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9700) Document FlinkKafkaProducer behaviour for Kafka versions > 0.11

2018-07-25 Thread Jonathan Miles (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556223#comment-16556223
 ] 

Jonathan Miles commented on FLINK-9700:
---

Duplicating the comment I added to FLINK-9690, since this is worth documenting 
as well...

Although the protocol is backwards compatible, there is a performance penalty 
when using the 0.11 consumer and producer with later Kafka brokers. The 
protocol was changed at some point and using older connectors requires the 
brokers to do conversions between the on-disk and wire protocols, so we lose 
the zero-copy functionality and add extra GC burden among other issues.

There's a note about this in the Kafka documentation. Although that's 
discussing conversions between different version numbers than we are, the 
effects are the same:

https://kafka.apache.org/0102/documentation.html#upgrade_10_performance_impact

> Document FlinkKafkaProducer behaviour for Kafka versions > 0.11
> ---
>
> Key: FLINK-9700
> URL: https://issues.apache.org/jira/browse/FLINK-9700
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.5.0
>Reporter: Ufuk Celebi
>Assignee: vinoyang
>Priority: Minor
>
> FlinkKafkaProducer for Kafka 0.11 uses reflection to work around API 
> limitations of the Kafka client. Using reflection breaks with newer versions 
> of the Kafka client (due to internal changes of the client).
> The documentation does not mention newer Kafka versions. We should add the 
> following notes:
> - Only package Kafka connector with kafka.version property set to 0.11.*.*
> - Mention that it is possible to use the 0.11 connector with newer versions 
> of Kafka as the protocol seems to be backwards compatible (double check that 
> this is correct)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9690) Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails

2018-07-25 Thread Jonathan Miles (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556222#comment-16556222
 ] 

Jonathan Miles commented on FLINK-9690:
---

Although the protocol is backwards compatible, there is a performance penalty 
when using the 0.11 consumer and producer with later Kafka brokers. The 
protocol was changed at some point and using older connectors requires the 
brokers to do conversions between the on-disk and wire protocols, so we lose 
the zero-copy functionality and add extra GC burden among other issues.

There's a note about this in the Kafka documentation. Although that's 
discussing conversions between different version numbers than we are, the 
effects are the same:

https://kafka.apache.org/0102/documentation.html#upgrade_10_performance_impact

> Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails
> 
>
> Key: FLINK-9690
> URL: https://issues.apache.org/jira/browse/FLINK-9690
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Ufuk Celebi
>Priority: Major
>
> Restoring a job from a savepoint that includes {{FlinkKafkaProducer}} 
> packaged with {{kafka.version}} set to {{1.1.0}} in Flink 1.4.2.
> {code}
> java.lang.RuntimeException: Incompatible KafkaProducer version
> at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:301)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:292)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.resumeTransaction(FlinkKafkaProducer.java:195)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:723)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93)
> at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:370)
> at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:330)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchFieldException: sequenceNumbers
> at java.lang.Class.getDeclaredField(Class.java:2070)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:297)
> ... 16 more
> {code}
> [~pnowojski] Any ideas about this issue? Judging from the stack trace it was 
> anticipated that reflective access might break with Kafka versions > 0.11.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)