Re: [DISCUSS] Expensive deterministic UDFs

2019-12-31 Thread Guy Khazma
Hi All,

For the use case where the expensive UDF has constant inputs (literals) we
have proposed the following JIRA and PR which calculates the UDF only once
in the driver:
https://issues.apache.org/jira/browse/SPARK-27692
https://github.com/apache/spark/pull/24593

If considering revisiting the optimization for UDFs the above PR can be
considered as well.

Thanks,
Guy



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [DISCUSS] Expensive deterministic UDFs

2019-11-08 Thread Enrico Minack
I agree that 'non-deterministic' is the right term for what it currently 
does: mark an expression as non-deterministic (returns different values 
for the same input, e.g. rand()), and the optimizer does its best to not 
break semantics when moving expressions around.


In case of expensive deterministic UDFs, or any expensive deterministic 
expression, the optimizer should not multiply effort. Even in case of 
cheap expressions like a * 5, where performance impact is comparably 
small, it simply should not execute things twice. So this is not about 
expensive deterministic expressions but deterministic expressions that 
get referenced multiple times.


Pushing those expressions into other expressions that reference them is 
useful in order to simplify those other expressions, e.g. 
df.withColumn("b", not($"a")).where(not($"b")) will eliminate the double 
negation of a.


So if expressions are referenced multiple times, they should not be 
collapsed, unless referencing expressions get simplified. And then the 
simplification must pay off for evaluating the referenced expression 
twice. This needs some kind of cost-model, or at least heuristics.


In case of UDFs, I think they should never be collapsed because they 
cannot be used to simplify other expressions (can they?). They should 
rather be materialised as close to the first reference as possible. If 
executing the UDF and referencing it multiple times happens in the same 
stage, hence the same generated code, we end up with the perfect 
situation where that materialisation of the result per call is hold in 
memory and processed by all referencing expressions.


Marking UDFs as expensive is not the right approach here, I agree, they 
should simply not be executed multiple times.


Enrico


Am 08.11.19 um 04:26 schrieb Wenchen Fan:
We really need some documents to define what non-deterministic means. 
AFAIK, non-deterministic expressions may produce a different result 
for the same input row, if the already processed input rows are 
different.


The optimizer tries its best to not change the input sequence 
of non-deterministic expressions. For example, `df.select(..., 
nonDeterministicExpr).filter...` can't do filter pushdown. An 
exception is filter condition. For `df.filter(nonDeterministic && 
cond)`, Spark still pushes down `cond` even if it may change the input 
sequence of the first condition. This is to respect the SQL semantic 
that filter conditions ANDed together are order-insensitive. Users 
should write `df.filter(nonDeterministic).filter(cond)` to guarantee 
the order.


For this particular problem, I think it's not only about UDF, but a 
general problem with how Spark collapses Projects.
For example, `df.select('a * 5 as 'b).select('b + 2, 'b + 3)`,  Spark 
optimizes it to `df.select('a * 5 + 2, 'a * 5 + 3)`, and execute 'a * 
5 twice.


I think we should revisit this optimization and think about when we 
can collapse.


On Thu, Nov 7, 2019 at 6:20 PM Rubén Berenguel > wrote:


That was very interesting, thanks Enrico.

Sean, IIRC it also prevents push down of the UDF in Catalyst in
some cases.

Regards,

Ruben

> On 7 Nov 2019, at 11:09, Sean Owen mailto:sro...@gmail.com>> wrote:
>
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag
with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple
times'.
>
> Why isn't caching the answer? I realize it's big, but you can
cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
>
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you
need.
>
> At first look, no I don't think this Spark-side workaround for
naming
> for your use case is worthwhile. There are existing better
solutions.
>
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:
>>
>> Hi all,
>>
>> Running expensive deterministic UDFs that return complex types,
followed by multiple references to those results cause Spark to
evaluate the UDF multiple times per row. This has been reported
and discussed before: SPARK-18748 SPARK-17728
>>
>>    val f: Int => Array[Int]
>>    val udfF = udf(f)
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>>
>> A common approach to make Spark evaluate the UDF only once is
to cache the intermediate result right after projecting the UDF:
>>
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .cache()
>>      

Re: [DISCUSS] Expensive deterministic UDFs

2019-11-07 Thread Wenchen Fan
We really need some documents to define what non-deterministic means.
AFAIK, non-deterministic expressions may produce a different result for the
same input row, if the already processed input rows are different.

The optimizer tries its best to not change the input sequence
of non-deterministic expressions. For example, `df.select(...,
nonDeterministicExpr).filter...` can't do filter pushdown. An exception is
filter condition. For `df.filter(nonDeterministic && cond)`, Spark still
pushes down `cond` even if it may change the input sequence of the first
condition. This is to respect the SQL semantic that filter conditions ANDed
together are order-insensitive. Users should write `
df.filter(nonDeterministic).filter(cond)` to guarantee the order.

For this particular problem, I think it's not only about UDF, but a general
problem with how Spark collapses Projects.
For example, `df.select('a * 5 as 'b).select('b + 2, 'b + 3)`,  Spark
optimizes it to `df.select('a * 5 + 2, 'a * 5 + 3)`, and execute 'a * 5
twice.

I think we should revisit this optimization and think about when we can
collapse.

On Thu, Nov 7, 2019 at 6:20 PM Rubén Berenguel  wrote:

> That was very interesting, thanks Enrico.
>
> Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.
>
> Regards,
>
> Ruben
>
> > On 7 Nov 2019, at 11:09, Sean Owen  wrote:
> >
> > Interesting, what does non-deterministic do except have this effect?
> > aside from the naming, it could be a fine use of this flag if that's
> > all it effectively does. I'm not sure I'd introduce another flag with
> > the same semantics just over naming. If anything 'expensive' also
> > isn't the right word, more like 'try not to evaluate multiple times'.
> >
> > Why isn't caching the answer? I realize it's big, but you can cache to
> > disk. This may be faster than whatever plan reordering has to happen
> > to evaluate once.
> >
> > Usually I'd say, can you redesign your UDF and code to be more
> > efficient too? or use a big a cluster if that's really what you need.
> >
> > At first look, no I don't think this Spark-side workaround for naming
> > for your use case is worthwhile. There are existing better solutions.
> >
> > On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack 
> wrote:
> >>
> >> Hi all,
> >>
> >> Running expensive deterministic UDFs that return complex types,
> followed by multiple references to those results cause Spark to evaluate
> the UDF multiple times per row. This has been reported and discussed
> before: SPARK-18748 SPARK-17728
> >>
> >>val f: Int => Array[Int]
> >>val udfF = udf(f)
> >>df
> >>  .select($"id", udfF($"id").as("array"))
> >>  .select($"array"(0).as("array0"), $"array"(1).as("array1"))
> >>
> >> A common approach to make Spark evaluate the UDF only once is to cache
> the intermediate result right after projecting the UDF:
> >>
> >>df
> >>  .select($"id", udfF($"id").as("array"))
> >>  .cache()
> >>  .select($"array"(0).as("array0"), $"array"(1).as("array1"))
> >>
> >> There are scenarios where this intermediate result is too big for the
> cluster to cache. Also this is bad design.
> >>
> >> The best approach is to mark the UDF as non-deterministic. Then Spark
> optimizes the query in a way that the UDF gets called only once per row,
> exactly what you want.
> >>
> >>val udfF = udf(f).asNondeterministic()
> >>
> >> However, stating a UDF is non-deterministic though it clearly is
> deterministic is counter-intuitive and makes your code harder to read.
> >>
> >> Spark should provide a better way to flag the UDF. Calling it expensive
> would be a better naming here.
> >>
> >>val udfF = udf(f).asExpensive()
> >>
> >> I understand that deterministic is a notion that Expression provides,
> and there is no equivalent to expensive that is understood by the
> optimizer. However, that asExpensive() could just set the
> ScalaUDF.udfDeterministic = deterministic && !expensive, which implements
> the best available approach behind a better naming.
> >>
> >> What are your thoughts on asExpensive()?
> >>
> >> Regards,
> >> Enrico
> >
> > -
> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: [DISCUSS] Expensive deterministic UDFs

2019-11-07 Thread Rubén Berenguel
That was very interesting, thanks Enrico.

Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.

Regards, 

Ruben

> On 7 Nov 2019, at 11:09, Sean Owen  wrote:
> 
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple times'.
> 
> Why isn't caching the answer? I realize it's big, but you can cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
> 
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you need.
> 
> At first look, no I don't think this Spark-side workaround for naming
> for your use case is worthwhile. There are existing better solutions.
> 
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack  wrote:
>> 
>> Hi all,
>> 
>> Running expensive deterministic UDFs that return complex types, followed by 
>> multiple references to those results cause Spark to evaluate the UDF 
>> multiple times per row. This has been reported and discussed before: 
>> SPARK-18748 SPARK-17728
>> 
>>val f: Int => Array[Int]
>>val udfF = udf(f)
>>df
>>  .select($"id", udfF($"id").as("array"))
>>  .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>> 
>> A common approach to make Spark evaluate the UDF only once is to cache the 
>> intermediate result right after projecting the UDF:
>> 
>>df
>>  .select($"id", udfF($"id").as("array"))
>>  .cache()
>>  .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>> 
>> There are scenarios where this intermediate result is too big for the 
>> cluster to cache. Also this is bad design.
>> 
>> The best approach is to mark the UDF as non-deterministic. Then Spark 
>> optimizes the query in a way that the UDF gets called only once per row, 
>> exactly what you want.
>> 
>>val udfF = udf(f).asNondeterministic()
>> 
>> However, stating a UDF is non-deterministic though it clearly is 
>> deterministic is counter-intuitive and makes your code harder to read.
>> 
>> Spark should provide a better way to flag the UDF. Calling it expensive 
>> would be a better naming here.
>> 
>>val udfF = udf(f).asExpensive()
>> 
>> I understand that deterministic is a notion that Expression provides, and 
>> there is no equivalent to expensive that is understood by the optimizer. 
>> However, that asExpensive() could just set the ScalaUDF.udfDeterministic = 
>> deterministic && !expensive, which implements the best available approach 
>> behind a better naming.
>> 
>> What are your thoughts on asExpensive()?
>> 
>> Regards,
>> Enrico
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [DISCUSS] Expensive deterministic UDFs

2019-11-07 Thread Sean Owen
Interesting, what does non-deterministic do except have this effect?
aside from the naming, it could be a fine use of this flag if that's
all it effectively does. I'm not sure I'd introduce another flag with
the same semantics just over naming. If anything 'expensive' also
isn't the right word, more like 'try not to evaluate multiple times'.

Why isn't caching the answer? I realize it's big, but you can cache to
disk. This may be faster than whatever plan reordering has to happen
to evaluate once.

Usually I'd say, can you redesign your UDF and code to be more
efficient too? or use a big a cluster if that's really what you need.

At first look, no I don't think this Spark-side workaround for naming
for your use case is worthwhile. There are existing better solutions.

On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack  wrote:
>
> Hi all,
>
> Running expensive deterministic UDFs that return complex types, followed by 
> multiple references to those results cause Spark to evaluate the UDF multiple 
> times per row. This has been reported and discussed before: SPARK-18748 
> SPARK-17728
>
> val f: Int => Array[Int]
> val udfF = udf(f)
> df
>   .select($"id", udfF($"id").as("array"))
>   .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>
> A common approach to make Spark evaluate the UDF only once is to cache the 
> intermediate result right after projecting the UDF:
>
> df
>   .select($"id", udfF($"id").as("array"))
>   .cache()
>   .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>
> There are scenarios where this intermediate result is too big for the cluster 
> to cache. Also this is bad design.
>
> The best approach is to mark the UDF as non-deterministic. Then Spark 
> optimizes the query in a way that the UDF gets called only once per row, 
> exactly what you want.
>
> val udfF = udf(f).asNondeterministic()
>
> However, stating a UDF is non-deterministic though it clearly is 
> deterministic is counter-intuitive and makes your code harder to read.
>
> Spark should provide a better way to flag the UDF. Calling it expensive would 
> be a better naming here.
>
> val udfF = udf(f).asExpensive()
>
> I understand that deterministic is a notion that Expression provides, and 
> there is no equivalent to expensive that is understood by the optimizer. 
> However, that asExpensive() could just set the ScalaUDF.udfDeterministic = 
> deterministic && !expensive, which implements the best available approach 
> behind a better naming.
>
> What are your thoughts on asExpensive()?
>
> Regards,
> Enrico

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org