Not fully understand your question, but maybe you want check out this JIRA, especially in the comments 
area. There are some discussion about the logic why UDF could be executed multi 
times by Spark.


From: tan shai <>
Sent: Tuesday, February 27, 2018 4:19 AM
Subject: Re: CATALYST rule join


I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

    SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

    case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

    case join@Join(left, right, _, Some(condition)) =>


    val attr = right.outputSet.find(x => x.toString().contains("x"))

    val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 

    val explode = Explode(udf)

    val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

    var newRight = Project(resolvedGenerator.output,resolvedGenerator)

    Join(left, newRight , Inner,Option(condition))


But the problem is that the operation `Generate explode` appears many times in 
the physical plan.

Do you have any other ideas ? Maybe rewriting the code.

Thank you

2018-02-25 23:08 GMT+01:00 tan shai 

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

    SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

    case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

    case join@Join(left, right, _, Some(condition)) =>


    val attr = right.outputSet.find(x => x.toString().contains("x"))

    val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 

    val explode = Explode(udf)

    val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

    var newRight = Project(resolvedGenerator.output,resolvedGenerator)

    Join(left, newRight , Inner,Option(condition))


But the problem is that the operation `Generate explode` appears many times in 
the physical plan.

Do you have any other ideas ? Maybe rewriting the code.

Thank you.

Reply via email to