[ https://issues.apache.org/jira/browse/SPARK-25155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585355#comment-16585355 ]
Dongjoon Hyun commented on SPARK-25155: --------------------------------------- Hi, [~gvernik]. Thank you for reporting, but `Fix Version` and `Target Version` will be handled by the Apache Spark committers later. - http://spark.apache.org/contributing.html (`JIRA` section). > Streaming from storage doesn't work when no directories exists > -------------------------------------------------------------- > > Key: SPARK-25155 > URL: https://issues.apache.org/jira/browse/SPARK-25155 > Project: Spark > Issue Type: Bug > Components: DStreams, Structured Streaming > Affects Versions: 2.3.1 > Reporter: Gil Vernik > Priority: Minor > > I have an issue related `org.apache.spark.streaming.dstream.FileInputDStream` > method `findNewFiles`. > Streaming for the giving path suppose to pickup new files only ( based on the > previous run timestamp ). However the code in Spark will first obtain > directories, then for each directory will find new files. Here is the > relevant code: > *val* directoryFilter = *new* PathFilter > { *override def* accept(path: Path): Boolean = > fs.getFileStatus(path).isDirectory } > *val* directories = fs.globStatus(directoryPath, > directoryFilter).map(_.getPath) > *val* newFiles = directories.flatMap(dir => > fs.listStatus(dir, newFileFilter).map(_.getPath.toString)) > > This is not optimized, as it always requires two accesses. In addition this > seems to be buggy > I have an S3 bucket “mydata” with objects “a.csv”, “b.csv”. I noticed that > fs.globStatus(“[s3a://mydata/], directoryFilter).map(_.getPath) returned 0 > directories and so “a.csv”, “b.csv” were not picked by Spark. > I tried to make path as “[s3a://mydata/*]” and it didn't worked also. > I experienced the same problematic behavior with the file system when tried > to stream from “/Users/streaming/*” > I suggest to change the code in Spark so it will perform first list without > directoryFilter, which seems not needed at all. The code could be > *val* directoriesOrfiles = fs.globStatus(directoryPath).map(_.getPath) > The flow would be ( for each entry in directoriesOrfiles ) > * If data object: Spark will apply newFileFilter on the returned objects > * If directory: then the existing code will perform additional listing at > the directory level > This way it will pick up files from the root of path and the content of > directories -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org