[ https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16733209#comment-16733209 ]
Gabor Somogyi commented on SPARK-26359: --------------------------------------- [~Tint] did the suggested workaround work? I would close this because this happened because of S3's read-after-write consistency. [~ste...@apache.org] {quote}With S3 the time to rename is about 6-10MB/s{quote} all I can say wow :) > Spark checkpoint restore fails after query restart > -------------------------------------------------- > > Key: SPARK-26359 > URL: https://issues.apache.org/jira/browse/SPARK-26359 > Project: Spark > Issue Type: Bug > Components: Spark Submit, Structured Streaming > Affects Versions: 2.4.0 > Environment: Spark 2.4.0 deployed in standalone-client mode > Checkpointing is done to S3 > The Spark application in question is responsible for running 4 different > queries > Queries are written using Structured Streaming > We are using the following algorithm for hopes of better performance: > spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the > default is 1 > Reporter: Kaspar Tint > Priority: Major > Attachments: driver-redacted, metadata, redacted-offsets, > state-redacted, worker-redacted > > > We had an incident where one of our structured streaming queries could not be > restarted after an usual S3 checkpointing failure. Now to clarify before > everything else - we are aware of the issues with S3 and are working towards > moving to HDFS but this will take time. S3 will cause queries to fail quite > often during peak hours and we have separate logic to handle this that will > attempt to restart the failed queries if possible. > In this particular case we could not restart one of the failed queries. Seems > like between detecting a failure in the query and starting it up again > something went really wrong with Spark and state in checkpoint folder got > corrupted for some reason. > The issue starts with the usual *FileNotFoundException* that happens with S3 > {code:java} > 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = > c074233a-2563-40fc-8036-b5e38e2e2c42, runId = > e607eb6e-8431-4269-acab-cc2c1f9f09dd] > terminated with error > java.io.FileNotFoundException: No such file or directory: > s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37 > 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715) > at > org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) > at > org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) > at > org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) > at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL > og.scala:126) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > 2018-12-10 21:09:25.791 WARN InsightsSparkAggregates$: Query feedback > terminated with exception, attempting restart > {code} > At the last line we claim that a restart will be attempted for the query > named *feedback*. We start the query up and encounter this almost immediately > {code:java} > 2018-12-10 21:09:26.870 WARN InsightsSparkAggregates$: Query feedback > currently not running, starting query in own scheduling pool > 2018-12-10 21:09:51.776 WARN TaskSetManager: Lost task 11.0 in stage 66240.0 > (TID 2782264, ec2-52-87-158-48.compute-1.amazonaws.com, executor 29): > java.lang.IllegalStateException: Error reading delta file > s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta of > HDFSStateStoreProvider[id = (op=2,part=11),dir = > s3a://some.domain/spark/checkpoints/49/feedback/state/2/11]: > s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta does > not exist > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:427) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:384) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383) > at > scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:383) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:356) > at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:535) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:356) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:204) > at > org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:371) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:88) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory: > s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699) > at > org.apache.hadoop.fs.DelegateToFileSystem.open(DelegateToFileSystem.java:190) > at > org.apache.hadoop.fs.AbstractFileSystem.open(AbstractFileSystem.java:649) > at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:802) > at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:798) > at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) > at org.apache.hadoop.fs.FileContext.open(FileContext.java:804) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.open(CheckpointFileManager.scala:322) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:424) > ... 28 more > {code} > And this will go on for ever until we bump the checkpoint folder name. > {code:java} > 2018-12-10 21:09:57.261 WARN TaskSetManager: Lost task 7.0 in stage 66265.0 > (TID 2783200, ec2-34-236-156-197.compute-1.amazonaws.com, executor 40): > java.lang.IllegalStateException: Error committing version 49464 into > HDFSStateStore[id=(op=1,part=7),dir=s3a://some.domain/spark/checkpoints/49/dlr/state/1/7] > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:138) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135) > ..... > {code} > Now when looking into S3 it indeed looks like this delta file never was > created. Instead we have a > {code:java} > s3://some.domain/spark/checkpoints/49/feedback/state/2/11/.36870.delta.02e75d1f-aa48-4c61-9257-a5928b32e22e.TID2469780.tmp > {code} > file that I assume is named like that as long as the whole operation is not > finished yet. So this file never got renamed to 36870.delta and the > application will keep trying to reference it. > I will have all the relevant redacted logs attached to this report together > with ls output of S3 folders and also the metadata file. If any more > information is needed then I would be happy to provide it. Would also > appreciate on some input on how to best resolve this issue? For now it has > happened on 2 separate days and the solution has been to bump the checkpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org