[ 
https://issues.apache.org/jira/browse/SPARK-41277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651877#comment-17651877
 ] 

Ohad Raviv commented on SPARK-41277:
------------------------------------

I managed to do some quick-and-dirty solution, just to be able to check it on 
existing processes.

I had to change `{_}spark.sql.legacy.createHiveTableByDefault=false{_}` as Hive 
provider, Spark and bucketing do not play nicely together (Spark uses a 
different hash function from Hive).

then I added a custom optimization rule:
{code:java}
object BucketingRule extends Rule[LogicalPlan] {

  override def apply(plan: LogicalPlan): LogicalPlan = {
    plan transform {
      case c @ CreateDataSourceTableAsSelectCommand(table, 
SaveMode.ErrorIfExists, query, _)
        if query.resolved =>
        query match {
          case Aggregate(grouping, _, _) =>
            val numBuckets = SQLConf.get.numShufflePartitions
            val bucketSpec = BucketSpec(numBuckets, 
grouping.map(_.asInstanceOf[AttributeReference].name), Nil)
            c.copy(table = table.copy(bucketSpec = Some(bucketSpec)))
          case _ => c
        }
    }
  }
} 

spark.sessionState.experimentalMethods.extraOptimizations ++= BucketingRule :: 
Nil{code}
And it works on this mock:
{code:java}
(1 to 30).map(i => ("k_" + (i-(1-i%2)), "v_" + i))
  .toDF("id", "val")
  .createOrReplaceTempView("t")

spark.sql(s"create table tbl1 select id,max(val) val, count(1) cnt from t group 
by id")
spark.table("t").write.bucketBy(3, "id").saveAsTable("tbl2")

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

val dfPlan = spark.sql("create table tbl3 as select tbl1.* from tbl1" +
  " join tbl2 on tbl1.id=tbl2.id")

dfPlan.explain(true)
spark.table("tbl3").show()
{code}
you could see that `tbl1` gets created as a bucketed table.

I will try to see if we get any noticeable performance gain.

meanwhile, could you suggest/direct to a better solution?

 

 

 

> Save and leverage shuffle key in tblproperties
> ----------------------------------------------
>
>                 Key: SPARK-41277
>                 URL: https://issues.apache.org/jira/browse/SPARK-41277
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.3.1
>            Reporter: Ohad Raviv
>            Priority: Minor
>
> I'm not sure if I'm not missing anything trivial.
> In a typical process, many datasets get materialized and many of them after a 
> shuffle (e.g join). then they would again be involved in further actions and 
> often use the same key.
> Wouldn't it make sense to save the shuffle key along with the table to avoid 
> unnecessary shuffles?
> Also, the implementation seems quite straightforward - to just leverage the 
> bucketing mechanism.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to