[ 
https://issues.apache.org/jira/browse/SPARK-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14245267#comment-14245267
 ] 

Tathagata Das commented on SPARK-4835:
--------------------------------------

I wonder whether there is a semantically cleaner way of doing this. In 
streaming we know which batches might have been already processed earlier 
because we store in the checkpoint the batches that were generated and queued. 
So upon recovery, the validation could be disabled only for those batches that 
were queue before failure, and not any of the subsequent batches. 
What do you think?


> Streaming saveAs*HadoopFiles() methods may throw FileAlreadyExistsException 
> during checkpoint recovery
> ------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-4835
>                 URL: https://issues.apache.org/jira/browse/SPARK-4835
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.0
>            Reporter: Josh Rosen
>            Assignee: Tathagata Das
>            Priority: Critical
>
> While running (a slightly modified version of) the "recovery with 
> saveAsHadoopFiles operation" test in the streaming CheckpointSuite, I noticed 
> the following error message in the streaming driver log:
> {code}
> 14/12/12 17:42:50.687 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO 
> JobScheduler: Added jobs for time 1500 ms
> 14/12/12 17:42:50.687 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO 
> RecurringTimer: Started timer for JobGenerator at time 2000
> 14/12/12 17:42:50.688 sparkDriver-akka.actor.default-dispatcher-3 INFO 
> JobScheduler: Starting job streaming job 1500 ms.0 from job set of time 1500 
> ms
> 14/12/12 17:42:50.688 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO 
> JobGenerator: Restarted JobGenerator at 2000 ms
> 14/12/12 17:42:50.688 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO 
> JobScheduler: Started JobScheduler
> 14/12/12 17:42:50.689 sparkDriver-akka.actor.default-dispatcher-3 INFO 
> JobScheduler: Starting job streaming job 1500 ms.1 from job set of time 1500 
> ms
> 14/12/12 17:42:50.689 sparkDriver-akka.actor.default-dispatcher-3 ERROR 
> JobScheduler: Error running job streaming job 1500 ms.0
> org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory 
> file:/var/folders/0k/2qp2p2vs7bv033vljnb8nk1c0000gn/T/1418434967213-0/-1500.result
>  already exists
>       at 
> org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:121)
>       at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1045)
>       at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944)
>       at 
> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9.apply(PairDStreamFunctions.scala:677)
>       at 
> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9.apply(PairDStreamFunctions.scala:675)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>       at scala.util.Try$.apply(Try.scala:161)
>       at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> 14/12/12 17:42:50.691 pool-12-thread-1 INFO SparkContext: Starting job: apply 
> at Transformer.scala:22
> {code}
> Spark Streaming's {{saveAsHadoopFiles}} method calls Spark's 
> {{rdd.saveAsHadoopFile}} method.  The Spark method, in turn, called 
> {{PairRDDFunctions.saveAsHadoopDataset()}}, which has error-checking to 
> ensure that the output directory does not already exist:
> {code}
>     if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
>       // FileOutputFormat ignores the filesystem parameter
>       val ignoredFs = FileSystem.get(hadoopConf)
>       hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
>     }
> {code}
> If Spark Streaming recovers from a checkpoint and re-runs the last batch in 
> the checkpoint, then {{saveAsHadoopDataset}} will have been called twice with 
> the same output path.  If the output path exists from the first, pre-recovery 
> run, then the recovery will fail.
> This seems like it could be a pretty serious issue: imagine that a streaming 
> job fails partway through a save() operation, then recovers: in this case, 
> the existing directory will prevent us from ever recovering and finishing the 
> save().
> Fortunately, this should be simple to fix: we should disable the existing 
> directory checks for output operations called by streaming jobs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to