[ 
https://issues.apache.org/jira/browse/BEAM-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-5934:
----------------------------------------

    Assignee:     (was: Chamikara Jayalath)

> FileSink affected by S3 eventual consistency
> --------------------------------------------
>
>                 Key: BEAM-5934
>                 URL: https://issues.apache.org/jira/browse/BEAM-5934
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-hadoop
>    Affects Versions: 2.7.0
>            Reporter: Pawel Bartoszek
>            Priority: Major
>
> After upgrading to BEAM 2.7.0 and Flink 1.5.2 from BEAM 2.2.0) every day few 
> times a day job throws _No such file or directory_ exception when trying to 
> move temp bundle to the final location.
> After digging into the code it looks like it's kind of S3 eventual 
> consistency problem. Where the HEAD request, used to check if the temporary 
> file exists before copying it to final location, returns 404 and the whole 
> copy operation fails.
> We use sharded writes(32). Job output arounds 256 files a minute. But the 
> exception is thrown max 3 times a day - which suggest that there is some race 
> condition somewhere.
>  
> The case where S3 enforces eventual consistency is where the check if the 
> file exist is being made before uploading the file. I checked the BEAM 
> FileSink and couldn't find any logic that pre-check if the temp bundle file 
> exists. 
>  
> {code:java}
> Amazon S3 provides read-after-write consistency for PUTS of new objects in 
> your S3 bucket in all regions with one caveat. The caveat is that if you make 
> a HEAD or GET request to the key name (to find if the object exists) before 
> creating the object, Amazon S3 provides eventual consistency for 
> read-after-write.{code}
>  
> The logs from the job
> {code:java}
> 2018-10-29 17:45:03,873 INFO org.apache.beam.sdk.io.WriteFiles - Opening 
> writer f990d5a0-d5a8-4ce2-adee-baa01e294ae4 for window 
> [2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z) pane 
> PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0} destination 
> null
> 2018-10-29 17:45:04,043 INFO org.apache.beam.sdk.io.FileBasedSink$Writer - 
> Successfully wrote temporary file 
> s3a:/XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
> 2018-10-29 17:45:05,437 INFO org.apache.beam.sdk.io.FileBasedSink - Will copy 
> temporary file FileResult{tempFilename=s3a://XXX/beam/.temp-beam-2018-
> 10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4, shard=9, 
> window=[2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z), 
> paneInfo=PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0}} to 
> final location s3a://XXX/rdot-17:43-17:44-pane-0-on_time-first-9.gz{code}
>  
> {code:java}
> Caused by: org.apache.beam.sdk.util.UserCodeException: 
> java.io.FileNotFoundException: No such file or directory: 
> s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
> at 
> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
> at 
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
> at 
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:128)
> at 
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:628)
> at 
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:80)
> at 
> org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:204)
> at 
> org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
> at 
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
> at 
> org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:71)
> at 
> org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
> at 
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
> at 
> org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101)
> at 
> org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at 
> org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101)
> at 
> org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
> at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
> at 
> org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
> at 
> org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
> at 
> org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
> at 
> org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
> at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
> at 
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
> at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
> at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at 
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: No such file or directory: 
> s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:367)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341)
> at 
> org.apache.beam.sdk.io.hdfs.HadoopFileSystem.copy(HadoopFileSystem.java:132)
> at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:288)
> at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:761)
> at 
> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:797){code}
>  
> CloudTrail logs (please note the the presented order of api calls might not 
> reflected the reality as eventtime is second precision only)
> ||eventtime||operation||errormessage||requestparameters||
> |2018-10-29T17:45:03Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
> |2018-10-29T17:45:03Z|ListObjects| 
> |{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}|
> |2018-10-29T17:45:03Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}|
>  |
> |2018-10-29T17:45:04Z|PutObject| 
> |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
> |2018-10-29T17:45:05Z|HeadObject| 
> |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
> |2018-10-29T17:45:06Z|ListObjects| 
> |{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}|
> |2018-10-29T17:45:06Z|HeadObject| 
> |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"|
> |2018-10-29T17:45:06Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
>  |
> |2018-10-29T17:45:06Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}|
>  |



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to