I have a Flink app with high parallelism (400) running in AWS EMR. It uses 
Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using 
RocksDb backend for checkpointing). The destination is defined using "s3a://" 
prefix. The Flink job is a streaming app which runs continuously. At any given 
time, it's possible that each worker will write to a part file in S3. This 
means all workers combined could potentially generate/write to 400 files (due 
to 400 parallelism).

After a few days, one of the workers will fail with the exception:

    org.apache.hadoop.fs.s3a.AWSS3IOException: 
copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, 
bucket/2018-09-01/05/_file-10-1.gz.pending): 
com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal 
error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; 
Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy

    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)

    at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)

    at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)

    at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)

    at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)

    at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)

    at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)

This seems to randomly occur when a new part file is created by the 
BucketingSink. The odd thing is that this happens randomly but consistently on 
separate job executions. When it occurs, it happens to 1 of the parallel flink 
workers (not all). Also, when this occurs, the Flink job transitions into a 
FAILING state, but the Flink job does not restart and resume/recover from the 
last successful checkpoint.

What is the cause for this and how can it be resolved? Also, how can the job be 
configured to restart/recover from the last successful checkpoint instead of 
staying in the FAILING state?

Reply via email to