[ https://issues.apache.org/jira/browse/SPARK-41277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651877#comment-17651877 ]
Ohad Raviv commented on SPARK-41277: ------------------------------------ I managed to do some quick-and-dirty solution, just to be able to check it on existing processes. I had to change `{_}spark.sql.legacy.createHiveTableByDefault=false{_}` as Hive provider, Spark and bucketing do not play nicely together (Spark uses a different hash function from Hive). then I added a custom optimization rule: {code:java} object BucketingRule extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan transform { case c @ CreateDataSourceTableAsSelectCommand(table, SaveMode.ErrorIfExists, query, _) if query.resolved => query match { case Aggregate(grouping, _, _) => val numBuckets = SQLConf.get.numShufflePartitions val bucketSpec = BucketSpec(numBuckets, grouping.map(_.asInstanceOf[AttributeReference].name), Nil) c.copy(table = table.copy(bucketSpec = Some(bucketSpec))) case _ => c } } } } spark.sessionState.experimentalMethods.extraOptimizations ++= BucketingRule :: Nil{code} And it works on this mock: {code:java} (1 to 30).map(i => ("k_" + (i-(1-i%2)), "v_" + i)) .toDF("id", "val") .createOrReplaceTempView("t") spark.sql(s"create table tbl1 select id,max(val) val, count(1) cnt from t group by id") spark.table("t").write.bucketBy(3, "id").saveAsTable("tbl2") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val dfPlan = spark.sql("create table tbl3 as select tbl1.* from tbl1" + " join tbl2 on tbl1.id=tbl2.id") dfPlan.explain(true) spark.table("tbl3").show() {code} you could see that `tbl1` gets created as a bucketed table. I will try to see if we get any noticeable performance gain. meanwhile, could you suggest/direct to a better solution? > Save and leverage shuffle key in tblproperties > ---------------------------------------------- > > Key: SPARK-41277 > URL: https://issues.apache.org/jira/browse/SPARK-41277 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.3.1 > Reporter: Ohad Raviv > Priority: Minor > > I'm not sure if I'm not missing anything trivial. > In a typical process, many datasets get materialized and many of them after a > shuffle (e.g join). then they would again be involved in further actions and > often use the same key. > Wouldn't it make sense to save the shuffle key along with the table to avoid > unnecessary shuffles? > Also, the implementation seems quite straightforward - to just leverage the > bucketing mechanism. > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org