c21 commented on a change in pull request #31413: URL: https://github.com/apache/spark/pull/31413#discussion_r568318526
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ########## @@ -591,20 +590,48 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles Review comment: @maropu - updated. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ########## @@ -591,20 +590,48 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles + val canPrune: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) => + filePath => { + BucketingUtils.getBucketId(filePath.getName) match { + case Some(id) => bucketSet.get(id) + case None => + if (ignoreCorruptFiles) { + // If ignoring corrupt file, do not prune when bucket file name is invalid + true + } else { + throw new IllegalStateException( + s"Invalid bucket file $filePath when doing bucket pruning. " + + s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable exception " + + "and read the file.") Review comment: @maropu - updated. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ########## @@ -591,20 +590,48 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles + val canPrune: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) => + filePath => { + BucketingUtils.getBucketId(filePath.getName) match { + case Some(id) => bucketSet.get(id) + case None => + if (ignoreCorruptFiles) { + // If ignoring corrupt file, do not prune when bucket file name is invalid + true + } else { + throw new IllegalStateException( + s"Invalid bucket file $filePath when doing bucket pruning. " + + s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable exception " + Review comment: @maropu - updated. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ########## @@ -591,20 +590,48 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles + val canPrune: Path => Boolean = optionalBucketSet match { Review comment: I am very bad at naming :) This is suggested from https://github.com/apache/spark/pull/31413#discussion_r567614728. Shall I change again? cc @maropu . ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ########## @@ -591,20 +590,48 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles + val canPrune: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) => + filePath => { + BucketingUtils.getBucketId(filePath.getName) match { + case Some(id) => bucketSet.get(id) + case None => + if (ignoreCorruptFiles) { + // If ignoring corrupt file, do not prune when bucket file name is invalid Review comment: @sunchao - this is newly introduced. Updated PR description. > Also I'm not sure if this is the best choice: if a bucketed table is corrupted, should we read the corrupt file? it will likely lead to incorrect results. On the other hand we can choose to ignore the file which seems to be more aligned with the name of the config, although result could still be incorrect. Note by default the exception will be thrown here and query will be failed loud. We allow a config here to help existing users to work around if they want. See relevant discussion in https://github.com/apache/spark/pull/31413#discussion_r567623746 . ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ########## @@ -591,20 +590,48 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles + val canPrune: Path => Boolean = optionalBucketSet match { Review comment: Changed to `shouldProcess` as I feel `shouldNotPrune` is hard to reason about. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ########## @@ -591,20 +590,48 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles + val canPrune: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) => + filePath => { + BucketingUtils.getBucketId(filePath.getName) match { + case Some(id) => bucketSet.get(id) + case None => + if (ignoreCorruptFiles) { + // If ignoring corrupt file, do not prune when bucket file name is invalid Review comment: I feel either skipping or processing the file is no way perfect. There can be other corruption case, where e.g. the table (specified with 1024 buckets), but only had 500 files underneath. This could be due to some other compute engines or users accidentally dump data here without respecting spark bucketing metadata. We have no efficient way to handle if number of files fewer than number of buckets. The existing usage of `ignoreCorruptFiles` [skip reading some of content of file](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala#L160), so it's also not completely ignoring. But I am fine if we think we need another config name for this. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ########## @@ -591,20 +590,48 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles + val canPrune: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) => + filePath => { + BucketingUtils.getBucketId(filePath.getName) match { + case Some(id) => bucketSet.get(id) + case None => + if (ignoreCorruptFiles) { + // If ignoring corrupt file, do not prune when bucket file name is invalid Review comment: Given users explicitly disable bucketing here for reading the table, I would assume they want to read the table as a non-bucketed table, so they would like to read all of input files, no? cc @viirya what's the use case you are thinking here? Thanks. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ########## @@ -591,20 +590,48 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles + val canPrune: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) => + filePath => { + BucketingUtils.getBucketId(filePath.getName) match { + case Some(id) => bucketSet.get(id) + case None => + if (ignoreCorruptFiles) { + // If ignoring corrupt file, do not prune when bucket file name is invalid Review comment: @cloud-fan - sorry which part you are suggesting to do in a followup PR? Here we anyway need to decide how do we handle when file name is not a valid bucket file name (process or not process the file) for pruning. Does I miss anything? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org