Hi Stephan,

I am facing S3 consistency related issue with the exception pasted at the
end:

We were able to solve the s3 sync issue by adding System.currentTime to
inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix
properties of BucketingSink.

I tried another approach by updating the BucketingSink code wherein I have
appended the partPath variable with System.currentTime (in openNewPartFile
method).

Can you please let me know if this is the correct approach in order to get
rid of this exception.

TimerException{java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress}
         at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
         at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
         at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
         at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
         at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245)
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201)
         at
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188)
         at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
         at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
         at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147)
         at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)


Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to