[ https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893 ]
Gabe Church edited comment on SPARK-32985 at 8/19/21, 11:02 PM: ---------------------------------------------------------------- This is a very important addition, thank you! Many cases where bucket pruning can be used to optimize lookup. Specifically, if we lack the ability to split the read tasks across multiple spark workers for date-partitioned tables that have some high cardinality columns and growing number of files per bucket, bucketing becomes useless when we are stuck with one task per executor. In my example below it takes a 2hr query on bucketed table to a 2min query even with the listFiles via manual read. val table = "ex_db.ex_tbl" val target_partition = "2021-01-01" val bucket_target = "valuex" val bucket_col = "bucket_col" val partition_col = "date" import org.apache.spark.sql.functions.\{col, lit} val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target))) val sparkplan = df.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get import org.apache.spark.sql.execution.datasources.FileScanRDD val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD] import org.apache.spark.sql.execution.datasources.FilePartition val bucket_files = for { FilePartition(bucketId, files) <- rdd.filePartitions f <- files } yield s"$f".replaceAll("path: ", "").split(",")(0) val format = bucket_files(0).split(" .").last val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target)) was (Author: gchurch): This is a very important addition, thank you! Many cases where bucket pruning can be used to optimize lookup. Specifically, if we lack the ability to split the read tasks across multiple spark workers for date-partitioned tables that have some high cardinality columns and growing number of files per bucket, bucketing becomes useless when we are stuck with one task per executor. In example below it can take a 2hr query to a 2min query even with the listFiles via manual read. My workaround example is below, and only works for individual partition reads. val table = "ex_db.ex_tbl" val target_partition = "2021-01-01" val bucket_target = "valuex" val bucket_col = "bucket_col" val partition_col = "date" import org.apache.spark.sql.functions.\{col, lit} val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target))) val sparkplan = df.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get import org.apache.spark.sql.execution.datasources.FileScanRDD val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD] import org.apache.spark.sql.execution.datasources.FilePartition val bucket_files = for { FilePartition(bucketId, files) <- rdd.filePartitions f <- files } yield s"$f".replaceAll("path: ", "").split(",")(0) val format = bucket_files(0).split(" .").last val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target)) > Decouple bucket filter pruning and bucket table scan > ---------------------------------------------------- > > Key: SPARK-32985 > URL: https://issues.apache.org/jira/browse/SPARK-32985 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.1.0 > Reporter: Cheng Su > Assignee: Cheng Su > Priority: Minor > Fix For: 3.2.0 > > > As a followup from discussion in > [https://github.com/apache/spark/pull/29804#discussion_r493100510] . > Currently in data source v1 file scan `FileSourceScanExec`, bucket filter > pruning will only take effect with bucket table scan - > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542] > . However this is unnecessary, as bucket filter pruning can also happen if > we disable bucketed table scan. This help query leverage the benefit from > bucket filter pruning to save CPU/IO to not read unnecessary bucket files, > and do not bound by bucket table scan when the parallelism of tasks is a > concern. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org