[ https://issues.apache.org/jira/browse/SPARK-31962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Christopher Highman updated SPARK-31962: ---------------------------------------- Description: When using structured streaming or just loading from a file data source, I've encountered a number of occasions where I want to be able to stream from a folder containing any number of historical files in CSV format. When I start reading from a folder, however, I might only care about files that were created after a certain time. {code:java} spark.read .option("header", "true") .option("delimiter", "\t") .format("csv") .load("/mnt/Deltas") {code} In [https://github.com/apache/spark/blob/f3771c6b47d0b3aef10b86586289a1f675c7cfe2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala], there is a method, _listLeafFiles,_ which builds _FileStatus_ objects containing an implicit _modificationDate_ property. We may already iterate the resulting files if a filter is applied to the path. In this case, its trivial to do a primitive comparison against _modificationDate_ and a date specified from an option. Without the filter specified, we would be expending less effort than if the filter were applied by itself since we are comparing primitives. Having the ability to provide an option where specifying a timestamp when loading files from a path would minimize complexity for consumers who leverage the ability to load files or do structured streaming from a folder path but do not have an interest in reading what could be thousands of files that are not relevant. One example to could be "_fileModifiedDate_" accepting a UTC datetime like below. {code:java} spark.read .option("header", "true") .option("delimiter", "\t") .option("fileModifiedDate", "2020-05-01T12:00:00") .format("csv") .load("/mnt/Deltas") {code} If this option is specified, the expected behavior would be that files within the _"/mnt/Deltas/"_ path must have been modified at or later than the specified time in order to be consumed for purposes of reading files from a folder path or via structured streaming. I have unit tests passing under F_ileIndexSuite_ in the _spark.sql.execution.datasources_ package. Stack Overflow -(spark structured streaming file source read from a certain partition onwards )|[https://stackoverflow.com/questions/58004832/spark-structured-streaming-file-source-read-from-a-certain-partition-onwards]] Stack Overflow - (Spark Structured Streaming File Source Starting Offset)|[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset/51399134#51399134]] was: When using structured streaming or just loading from a file data source, I've encountered a number of occasions where I want to be able to stream from a folder containing any number of historical files in CSV format. When I start reading from a folder, however, I might only care about files that were created after a certain time. {code:java} spark.read .option("header", "true") .option("delimiter", "\t") .format("csv") .load("/mnt/Deltas") {code} In [https://github.com/apache/spark/blob/f3771c6b47d0b3aef10b86586289a1f675c7cfe2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala], there is a method, _listLeafFiles,_ which builds _FileStatus_ objects containing an implicit _modificationDate_ property. We may already iterate the resulting files if a filter is applied to the path. In this case, its trivial to do a primitive comparison against _modificationDate_ and a date specified from an option. Without the filter specified, we would be expending less effort than if the filter were applied by itself since we are comparing primitives. Having the ability to provide an option where specifying a timestamp when loading files from a path would minimize complexity for consumers who leverage the ability to load files or do structured streaming from a folder path but do not have an interest in reading what could be thousands of files that are not relevant. One example to could be "_fileModifiedDate_" accepting a UTC datetime like below. {code:java} spark.read .option("header", "true") .option("delimiter", "\t") .option("fileModifiedDate", "2020-05-01T12:00:00") .format("csv") .load("/mnt/Deltas") {code} If this option is specified, the expected behavior would be that files within the _"/mnt/Deltas/"_ path must have been modified at or later than the specified time in order to be consumed for purposes of reading files from a folder path or via structured streaming. I have unit tests passing under F_ileIndexSuite_ in the _spark.sql.execution.datasources_ package. Stack Overflow -[spark structured streaming file source read from a certain partition onwards ](https://stackoverflow.com/questions/58004832/spark-structured-streaming-file-source-read-from-a-certain-partition-onwards)|http://example.com] Stack Overflow - [Spark Structured Streaming File Source Starting Offset](https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset/51399134#51399134)|http://example.com] > Provide option to load files after a specified date when reading from a > folder path > ----------------------------------------------------------------------------------- > > Key: SPARK-31962 > URL: https://issues.apache.org/jira/browse/SPARK-31962 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.1.0 > Reporter: Christopher Highman > Priority: Minor > > When using structured streaming or just loading from a file data source, I've > encountered a number of occasions where I want to be able to stream from a > folder containing any number of historical files in CSV format. When I start > reading from a folder, however, I might only care about files that were > created after a certain time. > {code:java} > spark.read > .option("header", "true") > .option("delimiter", "\t") > .format("csv") > .load("/mnt/Deltas") > {code} > In > [https://github.com/apache/spark/blob/f3771c6b47d0b3aef10b86586289a1f675c7cfe2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala], > there is a method, _listLeafFiles,_ which builds _FileStatus_ objects > containing an implicit _modificationDate_ property. We may already iterate > the resulting files if a filter is applied to the path. In this case, its > trivial to do a primitive comparison against _modificationDate_ and a date > specified from an option. Without the filter specified, we would be > expending less effort than if the filter were applied by itself since we are > comparing primitives. > Having the ability to provide an option where specifying a timestamp when > loading files from a path would minimize complexity for consumers who > leverage the ability to load files or do structured streaming from a folder > path but do not have an interest in reading what could be thousands of files > that are not relevant. > One example to could be "_fileModifiedDate_" accepting a UTC datetime like > below. > {code:java} > spark.read > .option("header", "true") > .option("delimiter", "\t") > .option("fileModifiedDate", "2020-05-01T12:00:00") > .format("csv") > .load("/mnt/Deltas") > {code} > If this option is specified, the expected behavior would be that files within > the _"/mnt/Deltas/"_ path must have been modified at or later than the > specified time in order to be consumed for purposes of reading files from a > folder path or via structured streaming. > I have unit tests passing under F_ileIndexSuite_ in the > _spark.sql.execution.datasources_ package. > > Stack Overflow -(spark structured streaming file source read from a certain > partition onwards > )|[https://stackoverflow.com/questions/58004832/spark-structured-streaming-file-source-read-from-a-certain-partition-onwards]] > Stack Overflow - (Spark Structured Streaming File Source Starting > Offset)|[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset/51399134#51399134]] -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org