[ 
https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893
 ] 

Gabe Church edited comment on SPARK-32985 at 8/19/21, 11:02 PM:
----------------------------------------------------------------

This is a very important addition, thank you! Many cases where bucket pruning 
can be used to optimize lookup. Specifically, if we lack the ability to split 
the read tasks across multiple spark workers for date-partitioned tables that 
have some high cardinality columns and growing number of files per bucket, 
bucketing becomes useless when we are stuck with one task per executor. In my 
example below it takes a 2hr query on bucketed table to a 2min query even with 
the listFiles via manual read. 

val table = "ex_db.ex_tbl"
 val target_partition = "2021-01-01"
 val bucket_target = "valuex"
 val bucket_col = "bucket_col"
 val partition_col = "date"

import org.apache.spark.sql.functions.\{col, lit}
 val df = 
spark.table(tablename).where((col(partition_col)===lit(target_partition)) && 
(col(bucket_col)===lit(bucket_target)))
 val sparkplan = df.queryExecution.executedPlan
 import org.apache.spark.sql.execution.FileSourceScanExec
 val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec 
}.get
 import org.apache.spark.sql.execution.datasources.FileScanRDD
 val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
 import org.apache.spark.sql.execution.datasources.FilePartition
 val bucket_files = for

{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }

yield s"$f".replaceAll("path: ", "").split(",")(0)
 val format = bucket_files(0).split("
 .").last
 val result_df = spark.read.option("mergeSchema", 
"False").format(format).load(bucket_files:_*).where(col(bucket_col) === 
lit(bucket_target))


was (Author: gchurch):
This is a very important addition, thank you! Many cases where bucket pruning 
can be used to optimize lookup. Specifically, if we lack the ability to split 
the read tasks across multiple spark workers for date-partitioned tables that 
have some high cardinality columns and growing number of files per bucket, 
bucketing becomes useless when we are stuck with one task per executor. In 
example below it can take a 2hr query to a 2min query even with the listFiles 
via manual read. 

My workaround example is below, and only works for individual partition reads. 

val table = "ex_db.ex_tbl"
 val target_partition = "2021-01-01"
 val bucket_target = "valuex"
 val bucket_col = "bucket_col"
 val partition_col = "date"

import org.apache.spark.sql.functions.\{col, lit}
 val df = 
spark.table(tablename).where((col(partition_col)===lit(target_partition)) && 
(col(bucket_col)===lit(bucket_target)))
 val sparkplan = df.queryExecution.executedPlan
 import org.apache.spark.sql.execution.FileSourceScanExec
 val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec 
}.get
 import org.apache.spark.sql.execution.datasources.FileScanRDD
 val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
 import org.apache.spark.sql.execution.datasources.FilePartition
 val bucket_files = for

{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }

yield s"$f".replaceAll("path: ", "").split(",")(0)
 val format = bucket_files(0).split("
 .").last
 val result_df = spark.read.option("mergeSchema", 
"False").format(format).load(bucket_files:_*).where(col(bucket_col) === 
lit(bucket_target))

> Decouple bucket filter pruning and bucket table scan
> ----------------------------------------------------
>
>                 Key: SPARK-32985
>                 URL: https://issues.apache.org/jira/browse/SPARK-32985
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Cheng Su
>            Assignee: Cheng Su
>            Priority: Minor
>             Fix For: 3.2.0
>
>
> As a followup from discussion in 
> [https://github.com/apache/spark/pull/29804#discussion_r493100510] . 
> Currently in data source v1 file scan `FileSourceScanExec`, bucket filter 
> pruning will only take effect with bucket table scan - 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542]
>  . However this is unnecessary, as bucket filter pruning can also happen if 
> we disable bucketed table scan. This help query leverage the benefit from 
> bucket filter pruning to save CPU/IO to not read unnecessary bucket files, 
> and do not bound by bucket table scan when the parallelism of tasks is a 
> concern.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to