[ 
https://issues.apache.org/jira/browse/SPARK-27808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Matveev updated SPARK-27808:
-------------------------------------
    Description: 
Currently it is not easily possible to make a structured streaming query to 
ignore all of the existing data inside a directory and only process new files, 
created after the job was started. See here for example: 
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]

 

My use case is to ignore everything which existed in the directory when the 
streaming job is first started (and there are no checkpoints), but to behave as 
usual when the stream is restarted, e.g. catch up reading new files since the 
last restart. This would allow us to use the streaming job for continuous 
processing, with all the benefits it brings, but also to keep the possibility 
to reprocess the data in the batch fashion by a different job, drop the 
checkpoints and make the streaming job only run for the new data.

 

It would be great to have an option similar to the `newFilesOnly` option on the 
original StreamingContext.fileStream method: 
[https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])]

but probably with slightly different semantics, described above (ignore all 
existing for the first run, catch up for the following runs)>

  was:
Currently it is not easily possible to make a structured streaming query to 
ignore all of the existing data inside a directory and only process new files, 
created after the job was started. See here for example: 
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]

 

My use case is to ignore everything which existed in the directory when the 
streaming job is first started (and there are no checkpoints), but to behave as 
usual when the stream is restarted, e.g. catch up reading new files since the 
last restart. This would allow us to use the streaming job for continuous 
processing, with all the benefits it brings, but also to keep the possibility 
to reprocess the data in the batch fashion by a different job, drop the 
checkpoints and make the streaming job only run for the new data.

 

It would be great to have an option similar to the `newFilesOnly` option on the 
original StreamingContext.fileStream method: 
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V)]]

but probably with slightly different semantics, described above (ignore all 
existing for the first run, catch up for the following runs)>


> Ability to ignore existing files for structured streaming
> ---------------------------------------------------------
>
>                 Key: SPARK-27808
>                 URL: https://issues.apache.org/jira/browse/SPARK-27808
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>    Affects Versions: 2.3.3, 2.4.3
>            Reporter: Vladimir Matveev
>            Priority: Major
>
> Currently it is not easily possible to make a structured streaming query to 
> ignore all of the existing data inside a directory and only process new 
> files, created after the job was started. See here for example: 
> [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]
>  
> My use case is to ignore everything which existed in the directory when the 
> streaming job is first started (and there are no checkpoints), but to behave 
> as usual when the stream is restarted, e.g. catch up reading new files since 
> the last restart. This would allow us to use the streaming job for continuous 
> processing, with all the benefits it brings, but also to keep the possibility 
> to reprocess the data in the batch fashion by a different job, drop the 
> checkpoints and make the streaming job only run for the new data.
>  
> It would be great to have an option similar to the `newFilesOnly` option on 
> the original StreamingContext.fileStream method: 
> [https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])]
> but probably with slightly different semantics, described above (ignore all 
> existing for the first run, catch up for the following runs)>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to