[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-11-04 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r517320604



##
File path: python/pyspark/sql/readwriter.py
##
@@ -390,26 +407,37 @@ def parquet(self, *paths, **options):
 """
 Loads Parquet files, returning the result as a :class:`DataFrame`.
 
-:param mergeSchema: sets whether we should merge schemas collected 
from all
-Parquet part-files. This will override
-``spark.sql.parquet.mergeSchema``. The default 
value is specified in
-``spark.sql.parquet.mergeSchema``.
-:param pathGlobFilter: an optional glob pattern to only include files 
with paths matching
-   the pattern. The syntax follows 
`org.apache.hadoop.fs.GlobFilter`.
-   It does not change the behavior of `partition 
discovery`_.
-:param modifiedBefore: an optional timestamp to only include files with
-modification times occurring before the specified time. 
The provided timestamp
-must be in the following format: -MM-DDTHH:mm:ss (e.g. 
2020-06-01T13:00:00)
-:param modifiedAfter: an optional timestamp to only include files with
-modification times occurring after the specified time. The 
provided timestamp
-must be in the following format: -MM-DDTHH:mm:ss
-Example: 2020-06-01T13:00:00
-:param recursiveFileLookup: recursively scan a directory for files. 
Using this option
-disables `partition discovery`_.
-
-.. _partition discovery:
-  
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
+.. versionadded:: 1.4.0
 
+Parameters
+--
+paths : str
+
+Other Parameters
+
+mergeSchema : str or bool, optional
+sets whether we should merge schemas collected from all
+Parquet part-files. This will override
+``spark.sql.parquet.mergeSchema``. The default value is specified 
in
+``spark.sql.parquet.mergeSchema``.
+pathGlobFilter : str or bool, optional
+an optional glob pattern to only include files with paths matching
+the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
+It does not change the behavior of
+`partition discovery 
`_.
  # noqa
+recursiveFileLookup : str or bool, optional
+recursively scan a directory for files. Using this option
+disables
+`partition discovery 
`_.
  # noqa
+modifiedBefore : an optional timestamp to only include files with
+modification times occurring before the specified time. The 
provided timestamp
+must be in the following format: -MM-DDTHH:mm:ss (e.g. 
2020-06-01T13:00:00)
+modifiedAfter : an optional timestamp to only include files with
+modification times occurring after the specified time. The 
provided timestamp
+must be in the following format: -MM-DDTHH:mm:ss (e.g. 
2020-06-01T13:00:00)

Review comment:
   Net difference is addition of lines 432-437





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-11-04 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r517320323



##
File path: python/pyspark/sql/readwriter.py
##
@@ -263,62 +263,79 @@ def json(self, path, schema=None, 
primitivesAsString=None, prefersDecimal=None,
 allows a mode for dealing with corrupt records during parsing. If 
None is
  set, it uses the default value, ``PERMISSIVE``.
 
-* ``PERMISSIVE``: when it meets a corrupted record, puts the 
malformed string \
-  into a field configured by ``columnNameOfCorruptRecord``, 
and sets malformed \
-  fields to ``null``. To keep corrupt records, an user can set 
a string type \
-  field named ``columnNameOfCorruptRecord`` in an user-defined 
schema. If a \
-  schema does not have the field, it drops corrupt records 
during parsing. \
-  When inferring a schema, it implicitly adds a 
``columnNameOfCorruptRecord`` \
-  field in an output schema.
-*  ``DROPMALFORMED``: ignores the whole corrupted records.
-*  ``FAILFAST``: throws an exception when it meets corrupted 
records.
-
-:param columnNameOfCorruptRecord: allows renaming the new field having 
malformed string
-  created by ``PERMISSIVE`` mode. This 
overrides
-  
``spark.sql.columnNameOfCorruptRecord``. If None is set,
-  it uses the value specified in
-  
``spark.sql.columnNameOfCorruptRecord``.
-:param dateFormat: sets the string that indicates a date format. 
Custom date formats
-   follow the formats at `datetime pattern`_.
-   This applies to date type. If None is set, it uses 
the
-   default value, ``-MM-dd``.
-:param timestampFormat: sets the string that indicates a timestamp 
format.
-Custom date formats follow the formats at 
`datetime pattern`_.
-This applies to timestamp type. If None is 
set, it uses the
-default value, 
``-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
-:param multiLine: parse one record, which may span multiple lines, per 
file. If None is
-  set, it uses the default value, ``false``.
-:param allowUnquotedControlChars: allows JSON Strings to contain 
unquoted control
-  characters (ASCII characters with 
value less than 32,
-  including tab and line feed 
characters) or not.
-:param encoding: allows to forcibly set one of standard basic or 
extended encoding for
- the JSON files. For example UTF-16BE, UTF-32LE. If 
None is set,
- the encoding of input JSON will be detected 
automatically
- when the multiLine option is set to ``true``.
-:param lineSep: defines the line separator that should be used for 
parsing. If None is
-set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
-:param samplingRatio: defines fraction of input JSON objects used for 
schema inferring.
-  If None is set, it uses the default value, 
``1.0``.
-:param dropFieldIfAllNull: whether to ignore column of all null values 
or empty
-   array/struct during schema inference. If 
None is set, it
-   uses the default value, ``false``.
-:param locale: sets a locale as language tag in IETF BCP 47 format. If 
None is set,
-   it uses the default value, ``en-US``. For instance, 
``locale`` is used while
-   parsing dates and timestamps.
-:param pathGlobFilter: an optional glob pattern to only include files 
with paths matching
-   the pattern. The syntax follows 
`org.apache.hadoop.fs.GlobFilter`.
-   It does not change the behavior of `partition 
discovery`_.
-:param modifiedBefore: an optional timestamp to only include files with
-modification times occurring before the specified time. 
The provided timestamp
-must be in the following format: -MM-DDTHH:mm:ss (e.g. 
2020-06-01T13:00:00)
-:param modifiedAfter: an optional timestamp to only include files with
-modification times occurring after the specified time. The 
provided timestamp
-must be in the following format: -MM-DDTHH:mm:ss (e.g. 
2020-06-01T13:00:00)
-:param recursiveFileLookup: recursively scan a directory for files. 
Using this option
-

[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-09-05 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r483995638



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  def createSingleFile(dir: File): File = {
+val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv")
+stringToFile(file, "text")
+  }
+
+  def setFileTime(time: LocalDateTime, file: File): Boolean = {
+val sameTime = time.toEpochSecond(ZoneOffset.UTC)
+file.setLastModified(sameTime * 1000)
+  }
+
+  def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC)
+file.setLastModified(sameTime * 1000)
+  }
+
+  def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC)
+file.setLastModified(sameTime * 1000)
+  }
+
+  def formatTime(time: LocalDateTime): String = {
+time.format(DateTimeFormatter.ofPattern("-MM-dd'T'HH:mm:ss"))
+  }
+
+  test("SPARK-31962: when modifiedBefore specified" +
+  " and sharing same timestamp with file last modified time.") {
+withTempDir { dir =>
+  val file = createSingleFile(dir)
+  val time = LocalDateTime.now(ZoneOffset.UTC)
+  setFileTime(time, file)
+  val formattedTime = formatTime(time)
+
+  val msg = intercept[AnalysisException] {
+spark.read
+.option("modifiedBefore", formattedTime)

Review comment:
   I updated pathFilters source as well after re-running scalafmt.  The 
only issue I noticed is that Spark's current scalafmt configuration has 
different formatting than which you suggested. 
   
   Example
   
![image](https://user-images.githubusercontent.com/659214/92314401-edf57300-ef8b-11ea-9366-1f053f24d78f.png)
   
   In this case, I defaulted with the results from scalafmt for consistency. 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-09-05 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r483940721



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  def createSingleFile(dir: File): File = {
+val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv")
+stringToFile(file, "text")
+  }
+
+  def setFileTime(time: LocalDateTime, file: File): Boolean = {
+val sameTime = time.toEpochSecond(ZoneOffset.UTC)
+file.setLastModified(sameTime * 1000)
+  }
+
+  def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC)
+file.setLastModified(sameTime * 1000)
+  }
+
+  def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): 
Boolean = {
+val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC)
+file.setLastModified(sameTime * 1000)
+  }
+
+  def formatTime(time: LocalDateTime): String = {
+time.format(DateTimeFormatter.ofPattern("-MM-dd'T'HH:mm:ss"))
+  }
+
+  test("SPARK-31962: when modifiedBefore specified" +
+  " and sharing same timestamp with file last modified time.") {
+withTempDir { dir =>
+  val file = createSingleFile(dir)
+  val time = LocalDateTime.now(ZoneOffset.UTC)
+  setFileTime(time, file)
+  val formattedTime = formatTime(time)
+
+  val msg = intercept[AnalysisException] {
+spark.read
+.option("modifiedBefore", formattedTime)

Review comment:
   @maropu 
   I updated this source file after running the scalafmt maven task.  Attached 
is the output based on our current config.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-25 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r476643343



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, CaseInsensitiveMap, 
DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {

Review comment:
   I'd love to follow up with something like this perhaps in a secondary PR 
but I probably wouldn't have time to get around to refactoring these tests 
again for a couple weeks.  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-23 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r475347166



##
File path: docs/sql-data-sources-generic-options.md
##
@@ -119,3 +119,48 @@ To load all files recursively, you can use:
 {% include_example recursive_file_lookup r/RSparkSQLExample.R %}
 
 
+
+### Modification Time Path Filters
+`modifiedBefore` and `modifiedAfter` are options that can be 
+applied together or separately in order to achieve greater
+granularity over which files may load during a Spark batch query.
+
+When the `timeZone` option is present, modified timestamps will be
+interpreted according to the specified zone. When a timezone option
+is not provided, modified timestamps will be interpreted according
+to the default zone specified within the Spark configuration. Without
+any timezone configuration, modified timestamps are interpreted as UTC.
+
+`modifiedBefore` will only allow files having last modified
+timestamps occurring before the specified time to load. For example,
+when`modifiedBefore` has the timestamp `2020-06-01T12:00:00` applied,
+all files modified after that time will not be considered when loading
+from a file data source.
+ 
+`modifiedAfter` only allows files having last modified timestamps
+occurring after the specified timestamp. For example, when`modifiedAfter`
+has the timestamp `2020-06-01T12:00:00` applied, only files modified after 
+this time will be eligible when loading from a file data source. When both
+`modifiedBefore` and `modifiedAfter` are specified together, files having
+last modified timestamps within the resulting time range are the only files
+allowed to load.

Review comment:
   Will update

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, CaseInsensitiveMap, 
DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file = new File(dir, "file1.csv")
+  stringToFile(file, "text")
+  file.setLastModified(DateTimeUtils.currentTimestamp())
+  val df = spark.read
+.option("modifiedAfter", "2019-05-10T01:11:00")
+.format("csv")
+.load(path.toString)
+  assert(df.count() == 1)
+}
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a future date") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file = new File(dir, "file1.csv")
+  stringToFile(file, "text")
+  val df = spark.read
+.option("modifiedBefore", "2090-05-10T01:11:00")
+.format("csv")
+.load(path.toString)
+  assert(df.count() == 1)
+}
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a past date") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file = new File(dir, "file1.csv")
+  stringToFile(file, "text")
+  file.setLastModified(DateTimeUtils.currentTimestamp())
+  val msg = intercept[AnalysisException] {
+spark.read
+  .option("modifiedBefore", "1984-05-01T01:00:00")
+  .format("csv")
+  .load(path.toString)
+  }.getMessage
+  assert(msg.contains("Unable to infer schema for CSV"))
+}
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, one valid") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file1 = new File(dir, "file1.csv")
+  val file2 = new File(dir, "file2.csv")
+  stringToFile(file1, "text")
+  stringToFile(file2, "text")
+  

[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-18 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r472586599



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, CaseInsensitiveMap, 
DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file = new File(dir, "file1.csv")
+  stringToFile(file, "text")
+  file.setLastModified(DateTimeUtils.currentTimestamp())
+  val df = spark.read
+.option("modifiedAfter", "2019-05-10T01:11:00")
+.format("csv")
+.load(path.toString)
+  assert(df.count() == 1)
+}
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a future date") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file = new File(dir, "file1.csv")
+  stringToFile(file, "text")
+  val df = spark.read
+.option("modifiedBefore", "2090-05-10T01:11:00")
+.format("csv")
+.load(path.toString)
+  assert(df.count() == 1)
+}
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a past date") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file = new File(dir, "file1.csv")
+  stringToFile(file, "text")
+  file.setLastModified(DateTimeUtils.currentTimestamp())
+  val msg = intercept[AnalysisException] {
+spark.read
+  .option("modifiedBefore", "1984-05-01T01:00:00")
+  .format("csv")
+  .load(path.toString)
+  }.getMessage
+  assert(msg.contains("Unable to infer schema for CSV"))
+}
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, one valid") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file1 = new File(dir, "file1.csv")
+  val file2 = new File(dir, "file2.csv")
+  stringToFile(file1, "text")
+  stringToFile(file2, "text")
+  file1.setLastModified(DateTimeUtils.currentTimestamp())

Review comment:
   So, looking for addition of one test which should expect a false outcome 
when timestamps are completely the same since we have not passed that 
respective time at that particular moment?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-18 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r472585965



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, CaseInsensitiveMap, 
DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file = new File(dir, "file1.csv")
+  stringToFile(file, "text")
+  file.setLastModified(DateTimeUtils.currentTimestamp())
+  val df = spark.read
+.option("modifiedAfter", "2019-05-10T01:11:00")
+.format("csv")
+.load(path.toString)
+  assert(df.count() == 1)
+}
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a future date") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file = new File(dir, "file1.csv")
+  stringToFile(file, "text")
+  val df = spark.read
+.option("modifiedBefore", "2090-05-10T01:11:00")
+.format("csv")
+.load(path.toString)
+  assert(df.count() == 1)
+}
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a past date") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file = new File(dir, "file1.csv")
+  stringToFile(file, "text")
+  file.setLastModified(DateTimeUtils.currentTimestamp())
+  val msg = intercept[AnalysisException] {
+spark.read
+  .option("modifiedBefore", "1984-05-01T01:00:00")
+  .format("csv")
+  .load(path.toString)
+  }.getMessage
+  assert(msg.contains("Unable to infer schema for CSV"))
+}
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, one valid") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file1 = new File(dir, "file1.csv")
+  val file2 = new File(dir, "file2.csv")
+  stringToFile(file1, "text")
+  stringToFile(file2, "text")
+  file1.setLastModified(DateTimeUtils.currentTimestamp())
+  file2.setLastModified(0)
+  val df = spark.read
+.option("modifiedAfter", "2019-05-10T01:11:00")
+.format("csv")
+.load(path.toString)
+  assert(df.count() == 1)
+}
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, both valid") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file1 = new File(dir, "file1.csv")
+  val file2 = new File(dir, "file2.csv")
+  stringToFile(file1, "text")
+  stringToFile(file2, "text")
+  file1.setLastModified(DateTimeUtils.currentTimestamp())
+  file2.setLastModified(DateTimeUtils.currentTimestamp())
+  val df = spark.read
+.option("modifiedAfter", "2019-05-10T01:11:00")
+.format("csv")
+.load(path.toString)
+  assert(df.count() == 2)
+}
+  }
+
+  test("SPARK-31962: when modifiedAfter specified with a past date, multiple 
files, none valid") {
+withTempDir { dir =>
+  val path = new Path(dir.getCanonicalPath)
+  val file1 = new File(dir, "file1.csv")
+  val file2 = new File(dir, "file2.csv")
+  stringToFile(file1, "text")
+  stringToFile(file2, "text")
+  file1.setLastModified(0)
+  file2.setLastModified(0)
+  val msg = intercept[AnalysisException] {
+spark.read
+  .option("modifiedAfter", "1984-05-01T01:00:00")
+  .format("csv")
+  .load(path.toString)
+  }.getMessage
+  assert(msg.contains("Unable to infer schema for CSV"))
+}
+  

[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-18 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r472012713



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path, PathFilter}
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Provide modifiedAfter and modifiedBefore options when
+ * filtering from a batch-based file data source.
+ *
+ * Example Usages
+ * Load all CSV files modified after date:
+ * {{{
+ *   
spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()
+ * }}}
+ *
+ * Load all CSV files modified before date:
+ * {{{
+ *   
spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()
+ * }}}
+ *
+ * Load all CSV files modified between two dates:
+ * {{{
+ *   spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00")
+ * .option("modifiedBefore","2020-06-15T05:00:00").load()
+ * }}}
+ *
+ * @param sparkSession SparkSession
+ * @param hadoopConf Hadoop Configuration object
+ * @param options Map containing options
+ */
+abstract class ModifiedDateFilter(

Review comment:
   Thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468993324



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala
##
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path, PathFilter}
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import 
org.apache.spark.sql.execution.datasources.pathfilters.PathFilterStrategy
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * SPARK-31962: Provide modifiedAfter and modifiedBefore options when
+ * filtering from a batch-based file data source.
+ *
+ * Example Usages
+ * Load all CSV files modified after date:
+ * 
spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()
+ *
+ * Load all CSV files modified before date:
+ * 
spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()
+ *
+ * Load all CSV files modified between two dates:
+ * spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00")
+ * .option("modifiedBefore","2020-06-15T05:00:00").load()
+ *
+@param sparkSession SparkSession
+@param hadoopConf Hadoop Configuration object
+@param options Map containing options
+ */
+abstract class ModifiedDateFilter(sparkSession: SparkSession,
+hadoopConf: Configuration,
+ options: CaseInsensitiveMap[String])
+extends PathFilterStrategy(sparkSession, hadoopConf, options) {
+  lazy val timeZoneId: String = options.getOrElse(
+DateTimeUtils.TIMEZONE_OPTION.toLowerCase(Locale.ROOT),
+SQLConf.get.sessionLocalTimeZone)
+
+  /* Implicitly defaults to UTC if unable to parse */
+  lazy val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId)
+  lazy val timeString: UTF8String =
+UTF8String.fromString(options.apply(strategy()))
+
+  def thresholdTime(): Long = {
+DateTimeUtils
+  .stringToTimestamp(timeString, timeZone.toZoneId)
+  .getOrElse(throw new AnalysisException(
+s"The timestamp provided for the '${strategy()}'" +
+  s" option is invalid.  The expected format is '-MM-DDTHH:mm:ss'. 
" +
+  s" Provided timestamp:  " +
+  s"${options.apply(strategy())}"))
+  }
+
+  def localTime(micros: Long): Long =
+DateTimeUtils.fromUTCTime(micros, timeZoneId)
+
+  def accept(fileStatus: FileStatus): Boolean
+  def accept(path: Path): Boolean
+  def strategy(): String
+}
+
+/**
+ * Filter used to determine whether file was modified
+ * before the provided timestamp.
+ *
+ @param sparkSession SparkSession
+ @param hadoopConf Hadoop Configuration object
+ @param options Map containing options
+ */
+class ModifiedBeforeFilter(sparkSession: SparkSession,
+hadoopConf: Configuration,
+options: CaseInsensitiveMap[String])
+extends ModifiedDateFilter(sparkSession, hadoopConf, options)
+with FileIndexFilter {

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468992874



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala
##
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path, PathFilter}
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import 
org.apache.spark.sql.execution.datasources.pathfilters.PathFilterStrategy
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * SPARK-31962: Provide modifiedAfter and modifiedBefore options when
+ * filtering from a batch-based file data source.
+ *
+ * Example Usages
+ * Load all CSV files modified after date:
+ * 
spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()
+ *
+ * Load all CSV files modified before date:
+ * 
spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()
+ *
+ * Load all CSV files modified between two dates:
+ * spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00")
+ * .option("modifiedBefore","2020-06-15T05:00:00").load()
+ *
+@param sparkSession SparkSession
+@param hadoopConf Hadoop Configuration object
+@param options Map containing options
+ */
+abstract class ModifiedDateFilter(sparkSession: SparkSession,
+hadoopConf: Configuration,
+ options: CaseInsensitiveMap[String])
+extends PathFilterStrategy(sparkSession, hadoopConf, options) {
+  lazy val timeZoneId: String = options.getOrElse(
+DateTimeUtils.TIMEZONE_OPTION.toLowerCase(Locale.ROOT),
+SQLConf.get.sessionLocalTimeZone)
+
+  /* Implicitly defaults to UTC if unable to parse */
+  lazy val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId)
+  lazy val timeString: UTF8String =
+UTF8String.fromString(options.apply(strategy()))
+
+  def thresholdTime(): Long = {
+DateTimeUtils
+  .stringToTimestamp(timeString, timeZone.toZoneId)
+  .getOrElse(throw new AnalysisException(
+s"The timestamp provided for the '${strategy()}'" +
+  s" option is invalid.  The expected format is '-MM-DDTHH:mm:ss'. 
" +
+  s" Provided timestamp:  " +
+  s"${options.apply(strategy())}"))
+  }
+
+  def localTime(micros: Long): Long =
+DateTimeUtils.fromUTCTime(micros, timeZoneId)
+
+  def accept(fileStatus: FileStatus): Boolean
+  def accept(path: Path): Boolean
+  def strategy(): String
+}
+
+/**
+ * Filter used to determine whether file was modified
+ * before the provided timestamp.
+ *
+ @param sparkSession SparkSession
+ @param hadoopConf Hadoop Configuration object
+ @param options Map containing options
+ */
+class ModifiedBeforeFilter(sparkSession: SparkSession,
+hadoopConf: Configuration,
+options: CaseInsensitiveMap[String])
+extends ModifiedDateFilter(sparkSession, hadoopConf, options)
+with FileIndexFilter {
+  override def accept(fileStatus: FileStatus): Boolean =
+/* We standardize on microseconds wherever possible */
+thresholdTime - localTime(
+  DateTimeUtils
+  /* getModificationTime returns in milliseconds */
+.millisToMicros(fileStatus.getModificationTime)) > 0
+
+  override def accept(path: Path): Boolean = true
+  override def strategy(): String = "modifiedbefore"
+}
+case object ModifiedBeforeFilter extends PathFilterObject {

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468988635



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.LocalDateTime
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, CaseInsensitiveMap, 
DateTimeUtils}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.pathfilters.PathFilterFactory
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite
+extends QueryTest
+with SharedSparkSession
+with AdaptiveSparkPlanHelper {
+  import testImplicits._
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+withTempDir { dir =>
+  withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+val path = new Path(dir.getCanonicalPath)
+val file = new File(dir, "file1.csv")
+stringToFile(file, "text")
+file.setLastModified(DateTimeUtils.currentTimestamp())
+val df = spark.read
+  .option("modifiedAfter", "2019-05-10T01:11:00")
+  .format("csv")
+  .load(path.toString)
+assert(df.count() == 1)
+  }
+}
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a future date") {
+withTempDir { dir =>
+  withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+val path = new Path(dir.getCanonicalPath)
+val file = new File(dir, "file1.csv")
+stringToFile(file, "text")
+val df = spark.read
+  .option("modifiedBefore", "2090-05-10T01:11:00")
+  .format("csv")
+  .load(path.toString)
+assert(df.count() == 1)
+  }
+}
+  }
+
+ test("SPARK-31962: when modifiedBefore specified with a past date") {
+withTempDir { dir =>
+  withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+val path = new Path(dir.getCanonicalPath)
+val file = new File(dir, "file1.csv")
+stringToFile(file, "text")
+file.setLastModified(DateTimeUtils.currentTimestamp())
+val msg = intercept[AnalysisException] {
+  spark.read
+.option("modifiedBefore", "1984-05-01T01:00:00")
+.format("csv")
+.load(path.toString)
+}.getMessage
+assert(msg.contains("Unable to infer schema for CSV"))
+  }
+}
+  }
+
+test("SPARK-31962: when modifiedAfter specified with a past date, " +
+"multiple files, one valid") {
+withTempDir { dir =>
+withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+val path = new Path(dir.getCanonicalPath)
+val file1 = new File(dir, "file1.csv")
+val file2 = new File(dir, "file2.csv")
+stringToFile(file1, "text")
+stringToFile(file2, "text")
+file1.setLastModified(DateTimeUtils.currentTimestamp())
+file2.setLastModified(0)
+val df = spark.read
+.option("modifiedAfter", "2019-05-10T01:11:00")
+.format("csv")
+.load(path.toString)
+assert(df.count() == 1)
+}
+}
+}
+test("SPARK-31962: when modifiedAfter specified with a past date, " +
+"multiple files, both valid") {

Review comment:
   I updated the descriptions and distinguished the tests to be more clear.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, 

[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468988356



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.LocalDateTime
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, CaseInsensitiveMap, 
DateTimeUtils}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.pathfilters.PathFilterFactory
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite
+extends QueryTest
+with SharedSparkSession
+with AdaptiveSparkPlanHelper {

Review comment:
   You are correct.  This has been removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468988180



##
File path: docs/sql-data-sources-generic-options.md
##
@@ -119,3 +119,47 @@ To load all files recursively, you can use:
 {% include_example recursive_file_lookup r/RSparkSQLExample.R %}
 
 
+
+### Modification Time Path Filters
+`modifiedBefore` and `modifiedAfter` are options that can be 
+applied together or separately in order to achieve greater
+granularity over which files may load during a Spark batch query. 
+When the `timeZone` option is present, modified timestamps will be
+interpreted according to the specified zone.  When a timezone option
+is not provided, modified timestamps will be interpreted according
+to the default zone specified within the Spark configuration.  Without
+any timezone configuration, modified timestamps are interpreted as UTC.
+
+`modifiedBefore` will only allow files having last modified
+timestamps occurring before the specified time to load.  For example,
+when`modifiedBefore`has the timestamp `2020-06-01T12:00:00` applied,
+ all files modified after that time will not be considered when loading from a 
file data source.
+`modifiedAfter` only allows files having last modified timestamps
+occurring after the specified timestamp.   For example, when`modifiedAfter`
+has the timestamp `2020-06-01T12:00:00` applied, only files modified after 
+ this time will be eligible when loading from a file data source.
+ 

Review comment:
   I cleaned up this entire section much better.  Good catch.

##
File path: docs/sql-data-sources-generic-options.md
##
@@ -119,3 +119,47 @@ To load all files recursively, you can use:
 {% include_example recursive_file_lookup r/RSparkSQLExample.R %}
 
 
+
+### Modification Time Path Filters
+`modifiedBefore` and `modifiedAfter` are options that can be 
+applied together or separately in order to achieve greater
+granularity over which files may load during a Spark batch query. 
+When the `timeZone` option is present, modified timestamps will be
+interpreted according to the specified zone.  When a timezone option
+is not provided, modified timestamps will be interpreted according
+to the default zone specified within the Spark configuration.  Without
+any timezone configuration, modified timestamps are interpreted as UTC.
+
+`modifiedBefore` will only allow files having last modified
+timestamps occurring before the specified time to load.  For example,
+when`modifiedBefore`has the timestamp `2020-06-01T12:00:00` applied,
+ all files modified after that time will not be considered when loading from a 
file data source.
+`modifiedAfter` only allows files having last modified timestamps
+occurring after the specified timestamp.   For example, when`modifiedAfter`

Review comment:
   Cleaned up.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468988409



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.LocalDateTime
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, CaseInsensitiveMap, 
DateTimeUtils}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.pathfilters.PathFilterFactory
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite
+extends QueryTest
+with SharedSparkSession
+with AdaptiveSparkPlanHelper {
+  import testImplicits._
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+withTempDir { dir =>
+  withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {

Review comment:
   I didn't initially see this.  Thank you for pointing that out.  I have 
removed these instances.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468972753



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala
##
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path, PathFilter}
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import 
org.apache.spark.sql.execution.datasources.pathfilters.PathFilterStrategy
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * SPARK-31962: Provide modifiedAfter and modifiedBefore options when
+ * filtering from a batch-based file data source.
+ *
+ * Example Usages
+ * Load all CSV files modified after date:
+ * 
spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()
+ *
+ * Load all CSV files modified before date:
+ * 
spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()
+ *
+ * Load all CSV files modified between two dates:
+ * spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00")
+ * .option("modifiedBefore","2020-06-15T05:00:00").load()
+ *
+@param sparkSession SparkSession
+@param hadoopConf Hadoop Configuration object
+@param options Map containing options
+ */
+abstract class ModifiedDateFilter(sparkSession: SparkSession,
+hadoopConf: Configuration,
+ options: CaseInsensitiveMap[String])
+extends PathFilterStrategy(sparkSession, hadoopConf, options) {
+  lazy val timeZoneId: String = options.getOrElse(

Review comment:
   Thank you..im going to adjust my IDE settings to correctly reflect these 
styles.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468970624



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathfilters/PathFilterFactory.scala
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.pathfilters
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.execution.datasources.{FileIndexFilter, 
ModifiedAfterFilter, ModifiedBeforeFilter, PathGlobFilter}
+
+object PathFilterFactory {
+  PathFilterStrategies.register(ModifiedAfterFilter)
+  PathFilterStrategies.register(ModifiedBeforeFilter)
+  PathFilterStrategies.register(PathGlobFilter)

Review comment:
   We could hard code them into the collection which is aware of path 
filter strategies but it would be less attractive to do so.  This just provides 
a very clean interface for accomplishing this objective.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468969864



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
##
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.LocalDateTime
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, CaseInsensitiveMap, 
DateTimeUtils}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.pathfilters.PathFilterFactory
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite
+extends QueryTest
+with SharedSparkSession
+with AdaptiveSparkPlanHelper {
+  import testImplicits._
+
+  test("SPARK-31962: when modifiedAfter specified with a past date") {
+withTempDir { dir =>
+  withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+val path = new Path(dir.getCanonicalPath)
+val file = new File(dir, "file1.csv")
+stringToFile(file, "text")
+file.setLastModified(DateTimeUtils.currentTimestamp())
+val df = spark.read
+  .option("modifiedAfter", "2019-05-10T01:11:00")
+  .format("csv")
+  .load(path.toString)
+assert(df.count() == 1)
+  }
+}
+  }
+
+  test("SPARK-31962: when modifiedBefore specified with a future date") {
+withTempDir { dir =>
+  withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+val path = new Path(dir.getCanonicalPath)
+val file = new File(dir, "file1.csv")
+stringToFile(file, "text")
+val df = spark.read
+  .option("modifiedBefore", "2090-05-10T01:11:00")
+  .format("csv")
+  .load(path.toString)
+assert(df.count() == 1)
+  }
+}
+  }
+
+ test("SPARK-31962: when modifiedBefore specified with a past date") {
+withTempDir { dir =>
+  withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+val path = new Path(dir.getCanonicalPath)
+val file = new File(dir, "file1.csv")
+stringToFile(file, "text")
+file.setLastModified(DateTimeUtils.currentTimestamp())
+val msg = intercept[AnalysisException] {
+  spark.read
+.option("modifiedBefore", "1984-05-01T01:00:00")
+.format("csv")
+.load(path.toString)
+}.getMessage
+assert(msg.contains("Unable to infer schema for CSV"))

Review comment:
   When attempting to load a CSV-based file data source, if no files are 
returned, this is the error message returned.  It was existing default 
behavior.  Whenever a filter scenario results in all files being filtered, this 
exception is caught and used to identify the result.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468636352



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathfilters/ModifiedAfterFilter.scala
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.pathfilters
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+
+/**
+ * [SPARK-31962]
+ * Filter used to determine whether file was modified
+ * after the provided timestamp.
+ *
+ * @param sparkSession SparkSession
+ * @param hadoopConf Hadoop Configuration object
+ * @param options Map containing options
+ */
+case class ModifiedAfterFilter(sparkSession: SparkSession,
+   hadoopConf: Configuration,
+   options: CaseInsensitiveMap[String])
+extends ModifiedDateFilter(sparkSession, hadoopConf, options)

Review comment:
   All instances where these class defs were off has been addressed.

##
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##
@@ -467,6 +467,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
* the pattern. The syntax follows 
org.apache.hadoop.fs.GlobFilter.
* It does not change the behavior of partition discovery.
+   * `modifiedBefore`: an optional timestamp to only include files with
+   * modification times  occurring before the specified time.  The provided 
timestamp
+   * must be in the following form:  -MM-DDTHH:mm:ss  Example:
+   * 2020-06-01T13:00:00
+   * `modifiedAfter`: an optional timestamp to only include files with
+   * modification times occurring after the specified time.  The provided 
timestamp

Review comment:
   This has been created in readwriter.py and also this file as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468636840



##
File path: python/pyspark/sql/readwriter.py
##
@@ -114,6 +114,14 @@ def option(self, key, value):
 * ``pathGlobFilter``: an optional glob pattern to only include 
files with paths matching
 the pattern. The syntax follows 
org.apache.hadoop.fs.GlobFilter.
 It does not change the behavior of partition discovery.
+* ``modifiedBefore``: an optional timestamp to only include files 
with
+modification times occurring before the specified time.  The 
provided timestamp
+must be in the following format:  -MM-DDTHH:mm:ss

Review comment:
   Completed throughout files.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468636699



##
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##
@@ -467,6 +467,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with 
paths matching
* the pattern. The syntax follows 
org.apache.hadoop.fs.GlobFilter.
* It does not change the behavior of partition discovery.
+   * `modifiedBefore`: an optional timestamp to only include files with
+   * modification times  occurring before the specified time.  The provided 
timestamp
+   * must be in the following form:  -MM-DDTHH:mm:ss  Example:

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468636142



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathfilters/FileIndexFilter.scala
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.pathfilters
+
+import org.apache.hadoop.fs.{FileStatus, PathFilter}
+
+
+trait FileIndexFilter extends PathFilter with Serializable {

Review comment:
   This restructuring has been completed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468636017



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathfilters/FileIndexFilter.scala
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.pathfilters
+
+import org.apache.hadoop.fs.{FileStatus, PathFilter}
+
+
+trait FileIndexFilter extends PathFilter with Serializable {
+  def accept(fileStatus: FileStatus): Boolean
+  def strategy(): String

Review comment:
   There is a direct string comparison between the option specified and the 
string representation of the strategy in PathFilterStrategies.  That's why its 
a string.  Its not meant to represent a class per say.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468635076



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
##
@@ -561,6 +564,367 @@ class FileBasedDataSourceSuite extends QueryTest
 }
   }
 
+  test("SPARK-31962 - when modifiedAfter specified " +

Review comment:
   Adjusted throughout files.

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathfilters/ModifiedAfterFilter.scala
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.pathfilters
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+
+/**
+ * [SPARK-31962]

Review comment:
   Removed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468634664



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
##
@@ -561,6 +564,367 @@ class FileBasedDataSourceSuite extends QueryTest
 }
   }
 
+  test("SPARK-31962 - when modifiedAfter specified " +
+  "with a past date") {
+withTempDir { dir =>
+  withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+val path = new Path(dir.getCanonicalPath)
+val file = new File(dir, "file1.csv")
+stringToFile(file, "text")
+file.setLastModified(DateTimeUtils.currentTimestamp())
+val df = spark.read
+.option("modifiedAfter", "2019-05-10T01:11:00")
+.format("csv")
+.load(path.toString)
+assert(df.count() == 1)
+  }
+}
+  }
+
+  test("SPARK-31962 - when modifiedBefore specified " +
+  "with a future date") {

Review comment:
   Completed

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
##
@@ -561,6 +564,367 @@ class FileBasedDataSourceSuite extends QueryTest
 }
   }
 
+  test("SPARK-31962 - when modifiedAfter specified " +
+  "with a past date") {

Review comment:
   Completed

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
##
@@ -561,6 +564,367 @@ class FileBasedDataSourceSuite extends QueryTest
 }
   }
 
+  test("SPARK-31962 - when modifiedAfter specified " +
+  "with a past date") {
+withTempDir { dir =>
+  withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+val path = new Path(dir.getCanonicalPath)
+val file = new File(dir, "file1.csv")

Review comment:
   Six more tests were added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-08-11 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r468630906



##
File path: docs/sql-data-sources-generic-options.md
##
@@ -119,3 +119,37 @@ To load all files recursively, you can use:
 {% include_example recursive_file_lookup r/RSparkSQLExample.R %}
 
 
+
+### Modification Time Path Filters
+`modifiedBefore` and `modifiedAfter` are options that can be 
+applied together or separately in order to achieve greater
+granularity over which files may load during a Spark batch query. 
+When the `timeZone` option is present, modified timestamps will be
+interpreted according to the specified zone.  When a timezone option
+is not provided, modified timestamps will be interpreted according
+to the default zone specified within the Spark configuration.  Without
+any timezone configuration, modified timestamps are interpreted as UTC.
+
+`modifiedBefore` will only allow files having last modified
+timestamps occurring before the specified time to load.  For example,
+when`modifiedBefore`has the timestamp `2020-06-01T12:00:00` applied,
+ all files modified after that time will not be considered when loading from a 
file data source.
+`modifiedAfter` only allows files having last modified timestamps
+occurring after the specified timestamp.   For example, when`modifiedAfter`
+has the timestamp `2020-06-01T12:00:00` applied, only files modified after 
+ this time will be eligible when loading from a file data source.
+ 
+When both `modifiedBefore` and `modifiedAfter` are specified together, files 
having
+last modified timestamps within the resulting time range are the only files
+allowed to load.
+
+Both options expect a timestamp in the following format: 
`-MM-DDTHH:mm:ss`.
+
+Example - Load files between 6/1 and 7/1 time range:
+`spark`
+`.read` 
+` .format("csv")`
+` .option('modifiedBefore','2020-07-01T08:33:05`)
+` .option('modifiedAfter','2020-06-01T08:33:05`)
+` .option('timeZone','PST`)

Review comment:
   Completed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-07-06 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r450584173



##
File path: docs/sql-data-sources-generic-options.md
##
@@ -119,3 +119,31 @@ To load all files recursively, you can use:
 {% include_example recursive_file_lookup r/RSparkSQLExample.R %}
 
 
+
+### Modification Date Filter
+
+`modifiedDateFilter` is an option used to only load files after a specified 
modification

Review comment:
   If you agree with this approach, I can be sure any reference to date in 
above, docs, etc instead use time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-07-06 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r450583791



##
File path: docs/sql-data-sources-generic-options.md
##
@@ -119,3 +119,31 @@ To load all files recursively, you can use:
 {% include_example recursive_file_lookup r/RSparkSQLExample.R %}
 
 
+
+### Modification Date Filter
+
+`modifiedDateFilter` is an option used to only load files after a specified 
modification

Review comment:
   @gengliangwang 
   This is out of date here after recent comments from @HeartSaVioR.  I updated 
the PR title/description above based on PR feedback.  I'm currently working on 
these additions with the following:
   
   **Example Usages**
   _Load all CSV files modified after date:_
   
`spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()`
   
   _Load all CSV files modified before date:_
   
`spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()`
   
   _Load all CSV files modified between two dates:_
   
`spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00").option("modifiedBefore","2020-06-15T05:00:00").load()`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-07-06 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r450583791



##
File path: docs/sql-data-sources-generic-options.md
##
@@ -119,3 +119,31 @@ To load all files recursively, you can use:
 {% include_example recursive_file_lookup r/RSparkSQLExample.R %}
 
 
+
+### Modification Date Filter
+
+`modifiedDateFilter` is an option used to only load files after a specified 
modification

Review comment:
   @gengliangwang 
   This is out of date here after recent comments from @HeartSaVioR.  I updated 
the PR commit above and title based on his feedback.  I'm currently working on 
these additions with the following:
   
   **Example Usages**
   _Load all CSV files modified after date:_
   
`spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()`
   
   _Load all CSV files modified before date:_
   
`spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()`
   
   _Load all CSV files modified between two dates:_
   
`spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00").option("modifiedBefore","2020-06-15T05:00:00").load()`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-07-06 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r450576299



##
File path: examples/src/main/python/sql/datasource.py
##
@@ -69,6 +69,25 @@ def generic_file_source_options_example(spark):
 # +-+
 # $example off:load_with_path_glob_filter$
 
+# $example on:load_with_modified_date_filter$
+# Assume file in provided path were last modified today.
+df = spark.read.load("examples/src/main/resources/dir1",
+ format="parquet", 
modifiedDateFilter="2020-06-01T08:30:00")

Review comment:
   Great question by the way..thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-07-06 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r450576025



##
File path: examples/src/main/python/sql/datasource.py
##
@@ -69,6 +69,25 @@ def generic_file_source_options_example(spark):
 # +-+
 # $example off:load_with_path_glob_filter$
 
+# $example on:load_with_modified_date_filter$
+# Assume file in provided path were last modified today.
+df = spark.read.load("examples/src/main/resources/dir1",
+ format="parquet", 
modifiedDateFilter="2020-06-01T08:30:00")

Review comment:
   @gengliangwang 
   I wonder if we should be defaulting to UTC unless this option below or a 
spark conf setting for timezone is specified.  If one is specified, interpret 
the timestamp according to that offset from UTC?
   
   
![image](https://user-images.githubusercontent.com/659214/86691815-08c45f00-bfbe-11ea-91e1-57605333aa45.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on a change in pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

2020-07-06 Thread GitBox


cchighman commented on a change in pull request #28841:
URL: https://github.com/apache/spark/pull/28841#discussion_r450576025



##
File path: examples/src/main/python/sql/datasource.py
##
@@ -69,6 +69,25 @@ def generic_file_source_options_example(spark):
 # +-+
 # $example off:load_with_path_glob_filter$
 
+# $example on:load_with_modified_date_filter$
+# Assume file in provided path were last modified today.
+df = spark.read.load("examples/src/main/resources/dir1",
+ format="parquet", 
modifiedDateFilter="2020-06-01T08:30:00")

Review comment:
   I wonder if we should be defaulting to UTC unless this option below or a 
spark conf setting for timezone is specified.  If one is specified, interpret 
the timestamp according to that offset from UTC?
   
   
![image](https://user-images.githubusercontent.com/659214/86691815-08c45f00-bfbe-11ea-91e1-57605333aa45.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org