Not fully understand your question, but maybe you want check out this JIRA
https://issues.apache.org/jira/browse/SPARK-17728, especially in the comments
area. There are some discussion about the logic why UDF could be executed multi
times by Spark.
Yong
From: tan shai <tan.shai...@gmail.com>
Sent: Tuesday, February 27, 2018 4:19 AM
To: user@spark.apache.org
Subject: Re: CATALYST rule join
Hi,
I need to write a rule to customize the join function using Spark Catalyst
optimizer. The objective to duplicate the second dataset using this process:
- Execute a udf on the column called x, this udf returns an array
- Execute an explode function on the new column
Using SQL terms, my objective is to execute this query on the second table :
SELECT EXPLODE(foo(x)) from table2
Where `foo` is is a udf that return an array of elements.
I have this rule:
case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case join@Join(left, right, _, Some(condition)) =>
{
val attr = right.outputSet.find(x => x.toString().contains("x"))
val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
Seq(attr.last.toAttribute))
val explode = Explode(udf)
val resolvedGenerator = Generate(explode, true,false, qualifier = None,
udf.references.toSeq, right)
var newRight = Project(resolvedGenerator.output,resolvedGenerator)
Join(left, newRight , Inner,Option(condition))
}
}
}
But the problem is that the operation `Generate explode` appears many times in
the physical plan.
Do you have any other ideas ? Maybe rewriting the code.
Thank you
2018-02-25 23:08 GMT+01:00 tan shai
<tan.shai...@gmail.com<mailto:tan.shai...@gmail.com>>:
Hi,
I need to write a rule to customize the join function using Spark Catalyst
optimizer. The objective to duplicate the second dataset using this process:
- Execute a udf on the column called x, this udf returns an array
- Execute an explode function on the new column
Using SQL terms, my objective is to execute this query on the second table :
SELECT EXPLODE(foo(x)) from table2
Where `foo` is is a udf that return an array of elements.
I have this rule:
case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case join@Join(left, right, _, Some(condition)) =>
{
val attr = right.outputSet.find(x => x.toString().contains("x"))
val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
Seq(attr.last.toAttribute))
val explode = Explode(udf)
val resolvedGenerator = Generate(explode, true,false, qualifier = None,
udf.references.toSeq, right)
var newRight = Project(resolvedGenerator.output,resolvedGenerator)
Join(left, newRight , Inner,Option(condition))
}
}
}
But the problem is that the operation `Generate explode` appears many times in
the physical plan.
Do you have any other ideas ? Maybe rewriting the code.
Thank you.