[ 
https://issues.apache.org/jira/browse/SPARK-25155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585438#comment-16585438
 ] 

Steve Loughran commented on SPARK-25155:
----------------------------------------

>From SPARK-17159 I have a more cloud-optimized stream source here 
>[CloudInputDStream|https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/org/apache/spark/streaming/hortonworks/CloudInputDStream.scala];
> see what you think and pick it up if you want to work on merging it in.

Optimising the scanning is good, but what the stores really need is listening 
to the event streams of new files arriving; AWS S3 and Azure both support that, 
don't know about the others. A stream source which worked with them would be 
*very* useful to people running on those infras, as every GET/HEAD/LIST runs up 
a bill and has pretty tangible latency too

> Streaming from storage doesn't work when no directories exists
> --------------------------------------------------------------
>
>                 Key: SPARK-25155
>                 URL: https://issues.apache.org/jira/browse/SPARK-25155
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Gil Vernik
>            Priority: Minor
>
> I have an issue related `org.apache.spark.streaming.dstream.FileInputDStream` 
> method `findNewFiles`.
> Streaming for the giving path suppose to pickup new files only ( based on the 
> previous run timestamp ). However the code in Spark will first obtain 
> directories, then for each directory will find new files. Here is the 
> relevant code:
> *val* directoryFilter = *new* PathFilter
> {   *override def* accept(path: Path): Boolean = 
> fs.getFileStatus(path).isDirectory }
> *val* directories = fs.globStatus(directoryPath, 
> directoryFilter).map(_.getPath)
> *val* newFiles = directories.flatMap(dir =>
>   fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
>  
> This is not optimized, as it always requires two accesses.  In addition this 
> seems to be  buggy
> I have an S3 bucket “mydata” with  objects “a.csv”, “b.csv”. I noticed that   
> fs.globStatus(“[s3a://mydata/], directoryFilter).map(_.getPath) returned 0 
> directories and so “a.csv”, “b.csv” were not picked by Spark.
> I tried to make path as “[s3a://mydata/*]” and it didn't worked also.
> I experienced the same problematic behavior with the file system when tried 
> to stream from “/Users/streaming/*”
>  I suggest to change the code in Spark so it will perform first list without 
> directoryFilter, which seems not needed at all. The code could  be
> *val* directoriesOrfiles = fs.globStatus(directoryPath).map(_.getPath)
> The flow would be ( for each entry in  directoriesOrfiles )
>  * If data object: Spark will apply newFileFilter on the returned objects
>  * If directory: then the existing  code will perform additional listing at 
> the directory level
> This way it will pick up files from the root of path and the content of 
> directories



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to