This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 60d02b444e2 [SPARK-45316][CORE][SQL] Add new parameters 
`ignoreCorruptFiles`/`ignoreMissingFiles` to `HadoopRDD` and `NewHadoopRDD`
60d02b444e2 is described below

commit 60d02b444e2225b3afbe4955dabbea505e9f769c
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Tue Sep 26 17:33:07 2023 +0300

    [SPARK-45316][CORE][SQL] Add new parameters 
`ignoreCorruptFiles`/`ignoreMissingFiles` to `HadoopRDD` and `NewHadoopRDD`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to add new parameters 
`ignoreCorruptFiles`/`ignoreMissingFiles` to `HadoopRDD` and `NewHadoopRDD`, 
and set it to the current value of:
    - `spark.files.ignoreCorruptFiles`/`ignoreMissingFiles` in Spark `core`,
    - `spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles` when the rdds 
created in Spark SQL.
    
    ### Why are the changes needed?
    1. To make `HadoopRDD` and `NewHadoopRDD` consistent to other RDDs like 
`FileScanRDD` created by Spark SQL that take into account the SQL configs 
`spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles`.
    2. To improve user experience with Spark SQL, so, users can control 
ignoring of missing files without re-creating spark context.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, `HadoopRDD`/`NewHadoopRDD` invoked by SQL code such hive table scans 
respect the SQL configs 
`spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles` and don't respect the 
core configs `spark.files.ignoreCorruptFiles`/`ignoreMissingFiles`.
    
    ### How was this patch tested?
    By running the affected tests:
    ```
    $ build/sbt "test:testOnly *QueryPartitionSuite"
    $ build/sbt "test:testOnly *FileSuite"
    $ build/sbt "test:testOnly *FileBasedDataSourceSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #43097 from MaxGekk/dynamic-ignoreMissingFiles.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../scala/org/apache/spark/rdd/HadoopRDD.scala     | 31 ++++++++++++++++++----
 .../scala/org/apache/spark/rdd/NewHadoopRDD.scala  | 27 +++++++++++++++----
 docs/sql-migration-guide.md                        |  1 +
 .../org/apache/spark/sql/hive/TableReader.scala    |  9 ++++---
 .../spark/sql/hive/QueryPartitionSuite.scala       |  6 ++---
 5 files changed, 58 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index cad107256c5..0b5f6a3d716 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -89,6 +89,8 @@ private[spark] class HadoopPartition(rddId: Int, override val 
index: Int, s: Inp
  * @param keyClass Class of the key associated with the inputFormatClass.
  * @param valueClass Class of the value associated with the inputFormatClass.
  * @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) 
to generate.
+ * @param ignoreCorruptFiles Whether to ignore corrupt files.
+ * @param ignoreMissingFiles Whether to ignore missing files.
  *
  * @note Instantiating this class directly is not recommended, please use
  * `org.apache.spark.SparkContext.hadoopRDD()`
@@ -101,13 +103,36 @@ class HadoopRDD[K, V](
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
-    minPartitions: Int)
+    minPartitions: Int,
+    ignoreCorruptFiles: Boolean,
+    ignoreMissingFiles: Boolean)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
   if (initLocalJobConfFuncOpt.isDefined) {
     sparkContext.clean(initLocalJobConfFuncOpt.get)
   }
 
+  def this(
+      sc: SparkContext,
+      broadcastedConf: Broadcast[SerializableConfiguration],
+      initLocalJobConfFuncOpt: Option[JobConf => Unit],
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      minPartitions: Int) = {
+    this(
+      sc,
+      broadcastedConf,
+      initLocalJobConfFuncOpt,
+      inputFormatClass,
+      keyClass,
+      valueClass,
+      minPartitions,
+      ignoreCorruptFiles = sc.conf.get(IGNORE_CORRUPT_FILES),
+      ignoreMissingFiles = sc.conf.get(IGNORE_MISSING_FILES)
+    )
+  }
+
   def this(
       sc: SparkContext,
       conf: JobConf,
@@ -135,10 +160,6 @@ class HadoopRDD[K, V](
 
   private val shouldCloneJobConf = 
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
 
-  private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
-
-  private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
-
   private val ignoreEmptySplits = 
sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
 
   // Returns a JobConf that will be used on executors to obtain input splits 
for Hadoop reads.
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 119fdae531f..17ef3214889 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -64,6 +64,8 @@ private[spark] class NewHadoopPartition(
  * @param inputFormatClass Storage format of the data to be read.
  * @param keyClass Class of the key associated with the inputFormatClass.
  * @param valueClass Class of the value associated with the inputFormatClass.
+ * @param ignoreCorruptFiles Whether to ignore corrupt files.
+ * @param ignoreMissingFiles Whether to ignore missing files.
  *
  * @note Instantiating this class directly is not recommended, please use
  * `org.apache.spark.SparkContext.newAPIHadoopRDD()`
@@ -74,9 +76,28 @@ class NewHadoopRDD[K, V](
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
-    @transient private val _conf: Configuration)
+    @transient private val _conf: Configuration,
+    ignoreCorruptFiles: Boolean,
+    ignoreMissingFiles: Boolean)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
+  def this(
+      sc : SparkContext,
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      _conf: Configuration) = {
+    this(
+      sc,
+      inputFormatClass,
+      keyClass,
+      valueClass,
+      _conf,
+      ignoreCorruptFiles = sc.conf.get(IGNORE_CORRUPT_FILES),
+      ignoreMissingFiles = sc.conf.get(IGNORE_MISSING_FILES))
+  }
+
+
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
   private val confBroadcast = sc.broadcast(new 
SerializableConfiguration(_conf))
   // private val serializableConf = new SerializableWritable(_conf)
@@ -90,10 +111,6 @@ class NewHadoopRDD[K, V](
 
   private val shouldCloneJobConf = 
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
 
-  private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
-
-  private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
-
   private val ignoreEmptySplits = 
sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
 
   def getConf: Configuration = {
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 61173f58b77..56a3c8292cd 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -25,6 +25,7 @@ license: |
 ## Upgrading from Spark SQL 3.5 to 4.0
 
 - Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is 
changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set 
`spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`).
+- Since Spark 4.0, any read of SQL tables takes into consideration the SQL 
configs 
`spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles` 
instead of the core config 
`spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`.
 
 ## Upgrading from Spark SQL 3.4 to 3.5
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index cd1d236dd36..5bb982624b0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -366,7 +366,9 @@ class HadoopTableReader(
       inputFormatClass,
       classOf[Writable],
       classOf[Writable],
-      _minSplitsPerRDD)
+      _minSplitsPerRDD,
+      ignoreCorruptFiles = conf.ignoreCorruptFiles,
+      ignoreMissingFiles = conf.ignoreMissingFiles)
 
     // Only take the value (skip the key) because Hive works only with values.
     rdd.map(_._2)
@@ -400,8 +402,9 @@ class HadoopTableReader(
       inputFormatClass,
       classOf[Writable],
       classOf[Writable],
-      jobConf
-    )
+      jobConf,
+      ignoreCorruptFiles = conf.ignoreCorruptFiles,
+      ignoreMissingFiles = conf.ignoreMissingFiles)
 
     // Only take the value (skip the key) because Hive works only with values.
     rdd.map(_._2)
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index cec6ec1ee12..f4fb18119fa 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
 
 import java.sql.Timestamp
 
-import org.apache.spark.internal.config._
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -73,8 +72,9 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils 
with TestHiveSingl
   }
 
   test("Replace spark.sql.hive.verifyPartitionPath by 
spark.files.ignoreMissingFiles") {
-    withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "false") {
-      sparkContext.conf.set(IGNORE_MISSING_FILES.key, "true")
+    withSQLConf(
+      SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "false",
+      SQLConf.IGNORE_MISSING_FILES.key -> "true") {
       queryWhenPathNotExist()
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to