c21 commented on a change in pull request #31413:
URL: https://github.com/apache/spark/pull/31413#discussion_r568318526



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles

Review comment:
       @maropu - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid
+                true
+              } else {
+                throw new IllegalStateException(
+                  s"Invalid bucket file $filePath when doing bucket pruning. " 
+
+                  s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable 
exception " +
+                    "and read the file.")

Review comment:
       @maropu - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid
+                true
+              } else {
+                throw new IllegalStateException(
+                  s"Invalid bucket file $filePath when doing bucket pruning. " 
+
+                  s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable 
exception " +

Review comment:
       @maropu - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {

Review comment:
       I am very bad at naming :) This is suggested from 
https://github.com/apache/spark/pull/31413#discussion_r567614728. Shall I 
change again? cc @maropu .

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid

Review comment:
       @sunchao - this is newly introduced. Updated PR description.
   
   > Also I'm not sure if this is the best choice: if a bucketed table is 
corrupted, should we read the corrupt file? it will likely lead to incorrect 
results. On the other hand we can choose to ignore the file which seems to be 
more aligned with the name of the config, although result could still be 
incorrect.
   
   Note by default the exception will be thrown here and query will be failed 
loud. We allow a config here to help existing users  to work around if they 
want. See relevant discussion in 
https://github.com/apache/spark/pull/31413#discussion_r567623746 .

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {

Review comment:
       Changed to `shouldProcess` as I feel `shouldNotPrune` is hard to reason 
about.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid

Review comment:
       I feel either skipping or processing the file is no way perfect. There 
can be other corruption case, where e.g. the table (specified with 1024 
buckets), but only had 500 files underneath. This could be due to some other 
compute engines or users accidentally dump data here without respecting spark 
bucketing metadata. We have no efficient way to handle if number of files fewer 
than number of buckets.
   
   The existing usage of `ignoreCorruptFiles` [skip reading some of content of 
file](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala#L160),
 so it's also not completely ignoring. But I am fine if we think we need 
another config name for this.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid

Review comment:
       Given users explicitly disable bucketing here for reading the table, I 
would assume they want to read the table as a non-bucketed table, so they would 
like to read all of input files, no? cc @viirya what's the use case you are 
thinking here? Thanks.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid

Review comment:
       @cloud-fan - sorry which part you are suggesting to do in a followup PR? 
Here we anyway need to decide how do we handle when file name is not a valid 
bucket file name (process or not process the file) for pruning. Does I miss 
anything?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to