Re: CATALYST rule join

2018-02-27 Thread Yong Zhang
Not fully understand your question, but maybe you want check out this JIRA 
https://issues.apache.org/jira/browse/SPARK-17728, especially in the comments 
area. There are some discussion about the logic why UDF could be executed multi 
times by Spark.

Yong


From: tan shai <tan.shai...@gmail.com>
Sent: Tuesday, February 27, 2018 4:19 AM
To: user@spark.apache.org
Subject: Re: CATALYST rule join

Hi,

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times in 
the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you


2018-02-25 23:08 GMT+01:00 tan shai 
<tan.shai...@gmail.com<mailto:tan.shai...@gmail.com>>:
Hi,

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times in 
the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you.




Re: CATALYST rule join

2018-02-27 Thread tan shai
 Hi,

I need to write a rule to customize the join function using Spark Catalyst
optimizer. The objective to duplicate the second dataset using this
process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None,
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times
in the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you


2018-02-25 23:08 GMT+01:00 tan shai :

> Hi,
>
> I need to write a rule to customize the join function using Spark Catalyst
> optimizer. The objective to duplicate the second dataset using this
> process:
>
> - Execute a udf on the column called x, this udf returns an array
>
> - Execute an explode function on the new column
>
> Using SQL terms, my objective is to execute this query on the second table
> :
>
> SELECT EXPLODE(foo(x)) from table2
>
> Where `foo` is is a udf that return an array of elements.
>
> I have this rule:
>
> case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {
>
> override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>
> case join@Join(left, right, _, Some(condition)) =>
>
> {
>
> val attr = right.outputSet.find(x => x.toString().contains("x"))
>
> val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
> Seq(attr.last.toAttribute))
>
> val explode = Explode(udf)
>
> val resolvedGenerator = Generate(explode, true,false, qualifier =
> None, udf.references.toSeq, right)
>
> var newRight = Project(resolvedGenerator.output,resolvedGenerator)
>
> Join(left, newRight , Inner,Option(condition))
>
> }
>   }
> }
>
> But the problem is that the operation `Generate explode` appears many
> times in the physical plan.
>
>
> Do you have any other ideas ? Maybe rewriting the code.
>
> Thank you.
>
>