Not fully understand your question, but maybe you want check out this JIRA https://issues.apache.org/jira/browse/SPARK-17728, especially in the comments area. There are some discussion about the logic why UDF could be executed multi times by Spark.
Yong ________________________________ From: tan shai <tan.shai...@gmail.com> Sent: Tuesday, February 27, 2018 4:19 AM To: user@spark.apache.org Subject: Re: CATALYST rule join Hi, I need to write a rule to customize the join function using Spark Catalyst optimizer. The objective to duplicate the second dataset using this process: - Execute a udf on the column called x, this udf returns an array - Execute an explode function on the new column Using SQL terms, my objective is to execute this query on the second table : SELECT EXPLODE(foo(x)) from table2 Where `foo` is is a udf that return an array of elements. I have this rule: case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case join@Join(left, right, _, Some(condition)) => { val attr = right.outputSet.find(x => x.toString().contains("x")) val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), Seq(attr.last.toAttribute)) val explode = Explode(udf) val resolvedGenerator = Generate(explode, true,false, qualifier = None, udf.references.toSeq, right) var newRight = Project(resolvedGenerator.output,resolvedGenerator) Join(left, newRight , Inner,Option(condition)) } } } But the problem is that the operation `Generate explode` appears many times in the physical plan. Do you have any other ideas ? Maybe rewriting the code. Thank you 2018-02-25 23:08 GMT+01:00 tan shai <tan.shai...@gmail.com<mailto:tan.shai...@gmail.com>>: Hi, I need to write a rule to customize the join function using Spark Catalyst optimizer. The objective to duplicate the second dataset using this process: - Execute a udf on the column called x, this udf returns an array - Execute an explode function on the new column Using SQL terms, my objective is to execute this query on the second table : SELECT EXPLODE(foo(x)) from table2 Where `foo` is is a udf that return an array of elements. I have this rule: case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case join@Join(left, right, _, Some(condition)) => { val attr = right.outputSet.find(x => x.toString().contains("x")) val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), Seq(attr.last.toAttribute)) val explode = Explode(udf) val resolvedGenerator = Generate(explode, true,false, qualifier = None, udf.references.toSeq, right) var newRight = Project(resolvedGenerator.output,resolvedGenerator) Join(left, newRight , Inner,Option(condition)) } } } But the problem is that the operation `Generate explode` appears many times in the physical plan. Do you have any other ideas ? Maybe rewriting the code. Thank you.