Hi, is there any idea on what causes this and how it can be resolved? Thanks.

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On Wednesday, December 5, 2018 12:44 AM, Flink Developer 
<developer...@protonmail.com> wrote:

> I have a Flink app with high parallelism (400) running in AWS EMR. It uses 
> Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using 
> RocksDb backend for checkpointing). The destination is defined using "s3a://" 
> prefix. The Flink job is a streaming app which runs continuously. At any 
> given time, it's possible that each worker will write to a part file in S3. 
> This means all workers combined could potentially generate/write to 400 files 
> (due to 400 parallelism).
>
> After a few days, one of the workers will fail with the exception:
>
>     org.apache.hadoop.fs.s3a.AWSS3IOException: 
> copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, 
> bucket/2018-09-01/05/_file-10-1.gz.pending): 
> com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal 
> error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; 
> Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy
>     at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 
> 178)
>     at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 
> 1803)
>     at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
>     at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
>     at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
>     at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
>     at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
>
> This seems to randomly occur when a new part file is created by the 
> BucketingSink. The odd thing is that this happens randomly but consistently 
> on separate job executions. When it occurs, it happens to 1 of the parallel 
> flink workers (not all). Also, when this occurs, the Flink job transitions 
> into a FAILING state, but the Flink job does not restart and resume/recover 
> from the last successful checkpoint.
>
> What is the cause for this and how can it be resolved? Also, how can the job 
> be configured to restart/recover from the last successful checkpoint instead 
> of staying in the FAILING state?

Reply via email to