davseitsev opened a new issue #1398:
URL: https://github.com/apache/iceberg/issues/1398


   I'm building Spark structured streaming application which writes data to 
Amazon S3 in Iceberg format. Sometimes the query fails without reties dues to 
S3 eventual consistency.
   ```
   20/08/28 10:15:08 ERROR MicroBatchExecution: Query 
event-streaming-query-76-v0 [id = bea88375-dd55-4eb1-bc4a-44ed9f5fdbf9, runId = 
7536644f-1ff8-42fd-b9b7-812fcdab1e21] terminated with error
   org.apache.spark.SparkException: Writing job aborted.
        at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:283)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:332)
        at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
        at 
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
        at 
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
        at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:540)
        at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
   Caused by: org.apache.iceberg.exceptions.NotFoundException: Failed to open 
input stream for file: 
s3a://my_bucket/warehouse.db/table_76/metadata/snap-8502691184173814269-1-8b57c8f4-1f87-4d61-9708-d890f251d101.avro
        at 
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:159)
        at 
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:95)
        at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
        at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
        at 
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
        at 
org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
        at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46)
        at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127)
        at org.apache.iceberg.BaseSnapshot.allManifests(BaseSnapshot.java:141)
        at org.apache.iceberg.FastAppend.apply(FastAppend.java:142)
        at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:149)
        at 
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:262)
        at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:403)
        at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:188)
        at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:261)
        at 
org.apache.iceberg.spark.source.Writer.commitOperation(Writer.java:149)
        at 
org.apache.iceberg.spark.source.StreamingWriter.commit(StreamingWriter.java:93)
        at 
org.apache.iceberg.spark.source.StreamingWriter.commit(StreamingWriter.java:86)
        at 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter.commit(MicroBatchWriter.scala:31)
        at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:76)
        ... 35 more
   Caused by: java.io.FileNotFoundException: No such file or directory: 
s3a://my_bucket/warehouse.db/table_76/metadata/snap-8502691184173814269-1-8b57c8f4-1f87-4d61-9708-d890f251d101.avro
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
        at 
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:157)
        ... 56 more
   ```
   As far as I understand it happens in `SnapshotProducer` during commit 
operation. `FastAppend` tries to read manifest list in `apply()` method and get 
`java.io.FileNotFoundException` in 
`S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)`. But actually file 
exists.
   
   It's a known issue with S3AFileSystem, it checks whether file exists before 
creating a file and it breaks read-after-write strong consistency. And when 
another client want to read newly created file it can get FNF exception.
   The problem is that `SnapshotProducer` reties only `CommitFailedException` 
and doesn't retry any `IOException`.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to