davseitsev opened a new issue #1398:
URL: https://github.com/apache/iceberg/issues/1398
I'm building Spark structured streaming application which writes data to
Amazon S3 in Iceberg format. Sometimes the query fails without reties dues to
S3 eventual consistency.
```
20/08/28 10:15:08 ERROR MicroBatchExecution: Query
event-streaming-query-76-v0 [id = bea88375-dd55-4eb1-bc4a-44ed9f5fdbf9, runId =
7536644f-1ff8-42fd-b9b7-812fcdab1e21] terminated with error
org.apache.spark.SparkException: Writing job aborted.
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:283)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:332)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
at
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
at
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:540)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.iceberg.exceptions.NotFoundException: Failed to open
input stream for file:
s3a://my_bucket/warehouse.db/table_76/metadata/snap-8502691184173814269-1-8b57c8f4-1f87-4d61-9708-d890f251d101.avro
at
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:159)
at
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:95)
at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
at
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
at
org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46)
at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127)
at org.apache.iceberg.BaseSnapshot.allManifests(BaseSnapshot.java:141)
at org.apache.iceberg.FastAppend.apply(FastAppend.java:142)
at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:149)
at
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:262)
at
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:403)
at
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:188)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:261)
at
org.apache.iceberg.spark.source.Writer.commitOperation(Writer.java:149)
at
org.apache.iceberg.spark.source.StreamingWriter.commit(StreamingWriter.java:93)
at
org.apache.iceberg.spark.source.StreamingWriter.commit(StreamingWriter.java:86)
at
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter.commit(MicroBatchWriter.scala:31)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:76)
... 35 more
Caused by: java.io.FileNotFoundException: No such file or directory:
s3a://my_bucket/warehouse.db/table_76/metadata/snap-8502691184173814269-1-8b57c8f4-1f87-4d61-9708-d890f251d101.avro
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
at
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:157)
... 56 more
```
As far as I understand it happens in `SnapshotProducer` during commit
operation. `FastAppend` tries to read manifest list in `apply()` method and get
`java.io.FileNotFoundException` in
`S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)`. But actually file
exists.
It's a known issue with S3AFileSystem, it checks whether file exists before
creating a file and it breaks read-after-write strong consistency. And when
another client want to read newly created file it can get FNF exception.
The problem is that `SnapshotProducer` reties only `CommitFailedException`
and doesn't retry any `IOException`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]