Kaspar Tint created SPARK-26359:
-----------------------------------

             Summary: Spark checkpoint restore fails after query restart
                 Key: SPARK-26359
                 URL: https://issues.apache.org/jira/browse/SPARK-26359
             Project: Spark
          Issue Type: Bug
          Components: Spark Submit, Structured Streaming
    Affects Versions: 2.4.0
         Environment: Spark 2.4.0 deployed in standalone-client mode
Checkpointing is done to S3
The Spark application in question is responsible for running 4 different queries
Queries are written using Structured Streaming
            Reporter: Kaspar Tint


We had an incident where one of our structured streaming queries could not be 
restarted after an usual S3 checkpointing failure. Now to clarify before 
everything else - we are aware of the issues with S3 and are working towards 
moving to HDFS but this will take time. S3 will cause queries to fail quite 
often during peak hours and we have separate logic to handle this that will 
attempt to restart the failed queries if possible.

In this particular case we could not restart one of the failed queries. Seems 
like between detecting a failure in the query and starting it up again 
something went really wrong with Spark and state in checkpoint folder got 
corrupted for some reason.

The issue starts with the usual *FileNotFoundException* that happens with S3
{code:java}
2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = 
c074233a-2563-40fc-8036-b5e38e2e2c42, runId = 
e607eb6e-8431-4269-acab-cc2c1f9f09dd]
terminated with error
java.io.FileNotFoundException: No such file or directory: 
s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37
348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
        at 
org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715)
        at 
org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
        at 
org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
        at 
org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
        at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965)
        at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
        at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
        at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL
og.scala:126)
        at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
        at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
        at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
        at scala.Option.getOrElse(Option.scala:121)
        at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
2018-12-10 21:09:25.791 WARN InsightsSparkAggregates$: Query feedback 
terminated with exception, attempting restart
{code}

At the last line we claim that a restart will be attempted for the query named 
*feedback*. We start the query up and encounter this almost immediately

{code:java}
2018-12-10 21:09:26.870 WARN InsightsSparkAggregates$: Query feedback currently 
not running, starting query in own scheduling pool
2018-12-10 21:09:51.776 WARN TaskSetManager: Lost task 11.0 in stage 66240.0 
(TID 2782264, ec2-52-87-158-48.compute-1.amazonaws.com, executor 29): 
java.lang.IllegalStateException: Error reading delta file 
s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta of 
HDFSStateStoreProvider[id = (op=2,part=11),dir = 
s3a://some.domain/spark/checkpoints/49/feedback/state/2/11]: 
s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta does not 
exist
        at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:427)
        at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:384)
        at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
        at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
        at 
scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:383)
        at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:356)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:535)
        at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:356)
        at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:204)
        at 
org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:371)
        at 
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: 
s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
        at 
org.apache.hadoop.fs.DelegateToFileSystem.open(DelegateToFileSystem.java:190)
        at 
org.apache.hadoop.fs.AbstractFileSystem.open(AbstractFileSystem.java:649)
        at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:802)
        at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:798)
        at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
        at org.apache.hadoop.fs.FileContext.open(FileContext.java:804)
        at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.open(CheckpointFileManager.scala:322)
        at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:424)
        ... 28 more
{code}

And this will go on for ever until we bump the checkpoint folder name.

{code:java}
2018-12-10 21:09:57.261 WARN TaskSetManager: Lost task 7.0 in stage 66265.0 
(TID 2783200, ec2-34-236-156-197.compute-1.amazonaws.com, executor 40): 
java.lang.IllegalStateException: Error committing version 49464 into 
HDFSStateStore[id=(op=1,part=7),dir=s3a://some.domain/spark/checkpoints/49/dlr/state/1/7]
        at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:138)
        at 
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)

    .....
{code}



Now when looking into S3 it indeed looks like this delta file never was 
created. Instead we have a
{code:java} 
s3://some.domain/spark/checkpoints/49/feedback/state/2/11/.36870.delta.02e75d1f-aa48-4c61-9257-a5928b32e22e.TID2469780.tmp
{code}
file that I assume is named like that as long as the whole operation is not 
finished yet. So this file never got renamed to 36870.delta and the application 
will keep trying to reference it.

I will have all the relevant redacted logs attached to this report together 
with ls output of S3 folders and also the metadata file. If any more 
information is needed then I would be happy to provide it. Would also 
appreciate on some input on how to best resolve this issue? For now it has 
happened on 2 separate days and the solution has been to bump the checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to