[ 
https://issues.apache.org/jira/browse/SPARK-27808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847460#comment-16847460
 ] 

Gabor Somogyi commented on SPARK-27808:
---------------------------------------

You've marked component as Structured Streaming and linked a DStreams 
documentation. Not sure which one you've using.
With Structured Streaming one can ignore existing data with noop sink which is 
added in Spark 3.0. After the ignore is done one can switch to the real sink 
type which will pick up the new data only.


> Ability to ignore existing files for structured streaming
> ---------------------------------------------------------
>
>                 Key: SPARK-27808
>                 URL: https://issues.apache.org/jira/browse/SPARK-27808
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>    Affects Versions: 2.3.3, 2.4.3
>            Reporter: Vladimir Matveev
>            Priority: Major
>
> Currently it is not easily possible to make a structured streaming query to 
> ignore all of the existing data inside a directory and only process new 
> files, created after the job was started. See here for example: 
> [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]
>  
> My use case is to ignore everything which existed in the directory when the 
> streaming job is first started (and there are no checkpoints), but to behave 
> as usual when the stream is restarted, e.g. catch up reading new files since 
> the last restart. This would allow us to use the streaming job for continuous 
> processing, with all the benefits it brings, but also to keep the possibility 
> to reprocess the data in the batch fashion by a different job, drop the 
> checkpoints and make the streaming job only run for the new data.
>  
> It would be great to have an option similar to the `newFilesOnly` option on 
> the original StreamingContext.fileStream method: 
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])]
> but probably with slightly different semantics, described above (ignore all 
> existing for the first run, catch up for the following runs)>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to