[ 
https://issues.apache.org/jira/browse/SPARK-18970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15771583#comment-15771583
 ] 

Lev edited comment on SPARK-18970 at 12/27/16 8:15 PM:
-------------------------------------------------------

Actually this is exactly the behavior I want. My problem is that application 
appeared to be alive, but was not processing and new files after this message 
in the log. I intentionally included the portion of the log at the top, showing 
that file list was refreshed every couple of minutes before. But as you can see 
refreshing have stopped after the logged error.


was (Author: lev.numerify):
Actually this is exactly the behavior I want. My problem is that application 
appeared to be alive, but was not processing and new files after this message 
in the log. I intentionally included the portion of the log in the bottom, 
showing that file lisd was refreshed every couple of minutes before.

> FileSource failure during file list refresh doesn't cause an application to 
> fail, but stops further processing
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18970
>                 URL: https://issues.apache.org/jira/browse/SPARK-18970
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.0.0, 2.0.2
>            Reporter: Lev
>         Attachments: sparkerror.log
>
>
> Spark streaming application uses S3 files as streaming sources. After running 
> for several day processing stopped even though an application continued to 
> run. 
> Stack trace:
> java.io.FileNotFoundException: No such file or directory 
> 's3n://XXXXXXXXXXXXXXXXX'
>       at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:818)
>       at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:511)
>       at 
> org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$3.apply(fileSourceInterfaces.scala:465)
>       at 
> org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$3.apply(fileSourceInterfaces.scala:462)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>       at scala.collection.AbstractIterator.to(Iterator.scala:1336)
>       at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
>       at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
>       at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:85)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> I believe 2 things should (or can) be fixed:
> 1. Application should fail in case of such an error.
> 2. Allow application to ignore such failure, since there is a chance that 
> during next refresh the error will not resurface. (In my case I believe an 
> error was cased by S3 cleaning the bucket exactly at the same moment when 
> refresh was running) 
> My code to create streaming processing looks as the following:
>       val cq = sqlContext.readStream
>         .format("json")
>         .schema(struct)
>         .load(s"input")
>         .writeStream
>         .option("checkpointLocation", s"checkpoints")
>         .foreach(new ForeachWriter[Row] {...})
>         .trigger(ProcessingTime("10 seconds")).start()
>               
>         cq.awaitTermination() 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to