Hi, is there any idea on what causes this and how it can be resolved? Thanks.
‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐ On Wednesday, December 5, 2018 12:44 AM, Flink Developer <developer...@protonmail.com> wrote: > I have a Flink app with high parallelism (400) running in AWS EMR. It uses > Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using > RocksDb backend for checkpointing). The destination is defined using "s3a://" > prefix. The Flink job is a streaming app which runs continuously. At any > given time, it's possible that each worker will write to a part file in S3. > This means all workers combined could potentially generate/write to 400 files > (due to 400 parallelism). > > After a few days, one of the workers will fail with the exception: > > org.apache.hadoop.fs.s3a.AWSS3IOException: > copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, > bucket/2018-09-01/05/_file-10-1.gz.pending): > com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal > error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; > Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy > at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: > 178) > at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: > 1803) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776) > at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) > > This seems to randomly occur when a new part file is created by the > BucketingSink. The odd thing is that this happens randomly but consistently > on separate job executions. When it occurs, it happens to 1 of the parallel > flink workers (not all). Also, when this occurs, the Flink job transitions > into a FAILING state, but the Flink job does not restart and resume/recover > from the last successful checkpoint. > > What is the cause for this and how can it be resolved? Also, how can the job > be configured to restart/recover from the last successful checkpoint instead > of staying in the FAILING state?