Not fully understand your question, but maybe you want check out this JIRA 
https://issues.apache.org/jira/browse/SPARK-17728, especially in the comments 
area. There are some discussion about the logic why UDF could be executed multi 
times by Spark.

Yong

________________________________
From: tan shai <tan.shai...@gmail.com>
Sent: Tuesday, February 27, 2018 4:19 AM
To: user@spark.apache.org
Subject: Re: CATALYST rule join

Hi,

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

    SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

    case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

    case join@Join(left, right, _, Some(condition)) =>

        {

    val attr = right.outputSet.find(x => x.toString().contains("x"))

    val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 
Seq(attr.last.toAttribute))

    val explode = Explode(udf)

    val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

    var newRight = Project(resolvedGenerator.output,resolvedGenerator)

    Join(left, newRight , Inner,Option(condition))

        }
      }
    }

But the problem is that the operation `Generate explode` appears many times in 
the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you


2018-02-25 23:08 GMT+01:00 tan shai 
<tan.shai...@gmail.com<mailto:tan.shai...@gmail.com>>:
Hi,

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

    SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

    case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

    case join@Join(left, right, _, Some(condition)) =>

        {

    val attr = right.outputSet.find(x => x.toString().contains("x"))

    val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 
Seq(attr.last.toAttribute))

    val explode = Explode(udf)

    val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

    var newRight = Project(resolvedGenerator.output,resolvedGenerator)

    Join(left, newRight , Inner,Option(condition))

        }
      }
    }

But the problem is that the operation `Generate explode` appears many times in 
the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you.


Reply via email to