[ https://issues.apache.org/jira/browse/BEAM-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123027#comment-17123027 ]
Beam JIRA Bot commented on BEAM-5934: ------------------------------------- This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3. Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean. > FileSink affected by S3 eventual consistency > -------------------------------------------- > > Key: BEAM-5934 > URL: https://issues.apache.org/jira/browse/BEAM-5934 > Project: Beam > Issue Type: Bug > Components: io-java-hadoop-file-system > Affects Versions: 2.7.0 > Reporter: Pawel Bartoszek > Priority: P2 > Labels: stale-P2 > > After upgrading to BEAM 2.7.0 and Flink 1.5.2 from BEAM 2.2.0) every day few > times a day job throws _No such file or directory_ exception when trying to > move temp bundle to the final location. > After digging into the code it looks like it's kind of S3 eventual > consistency problem. Where the HEAD request, used to check if the temporary > file exists before copying it to final location, returns 404 and the whole > copy operation fails. > We use sharded writes(32). Job output arounds 256 files a minute. But the > exception is thrown max 3 times a day - which suggest that there is some race > condition somewhere. > > The case where S3 enforces eventual consistency is where the check if the > file exist is being made before uploading the file. I checked the BEAM > FileSink and couldn't find any logic that pre-check if the temp bundle file > exists. > > {code:java} > Amazon S3 provides read-after-write consistency for PUTS of new objects in > your S3 bucket in all regions with one caveat. The caveat is that if you make > a HEAD or GET request to the key name (to find if the object exists) before > creating the object, Amazon S3 provides eventual consistency for > read-after-write.{code} > > The logs from the job > {code:java} > 2018-10-29 17:45:03,873 INFO org.apache.beam.sdk.io.WriteFiles - Opening > writer f990d5a0-d5a8-4ce2-adee-baa01e294ae4 for window > [2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z) pane > PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0} destination > null > 2018-10-29 17:45:04,043 INFO org.apache.beam.sdk.io.FileBasedSink$Writer - > Successfully wrote temporary file > s3a:/XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4 > 2018-10-29 17:45:05,437 INFO org.apache.beam.sdk.io.FileBasedSink - Will copy > temporary file FileResult{tempFilename=s3a://XXX/beam/.temp-beam-2018- > 10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4, shard=9, > window=[2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z), > paneInfo=PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0}} to > final location s3a://XXX/rdot-17:43-17:44-pane-0-on_time-first-9.gz{code} > > {code:java} > Caused by: org.apache.beam.sdk.util.UserCodeException: > java.io.FileNotFoundException: No such file or directory: > s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4 > at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) > at > org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621) > at > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71) > at > org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:128) > at > org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:628) > at > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:80) > at > org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:204) > at > org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621) > at > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71) > at > org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:71) > at > org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621) > at > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71) > at > org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101) > at > org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513) > at > org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101) > at > org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92) > at > org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057) > at > org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438) > at > org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125) > at > org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060) > at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930) > at > org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) > at > org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory: > s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4 > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642) > at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790) > at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:367) > at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341) > at > org.apache.beam.sdk.io.hdfs.HadoopFileSystem.copy(HadoopFileSystem.java:132) > at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:288) > at > org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:761) > at > org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:797){code} > > CloudTrail logs (please note the the presented order of api calls might not > reflected the reality as eventtime is second precision only) > ||eventtime||operation||errormessage||requestparameters|| > |2018-10-29T17:45:03Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}| > |2018-10-29T17:45:03Z|ListObjects| > |{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}| > |2018-10-29T17:45:03Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}| > | > |2018-10-29T17:45:04Z|PutObject| > |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}| > |2018-10-29T17:45:05Z|HeadObject| > |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}| > |2018-10-29T17:45:06Z|ListObjects| > |{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}| > |2018-10-29T17:45:06Z|HeadObject| > |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"| > |2018-10-29T17:45:06Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}| > | > |2018-10-29T17:45:06Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}| > | -- This message was sent by Atlassian Jira (v8.3.4#803005)