That was very interesting, thanks Enrico.

Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.

Regards, 

Ruben

> On 7 Nov 2019, at 11:09, Sean Owen <sro...@gmail.com> wrote:
> 
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple times'.
> 
> Why isn't caching the answer? I realize it's big, but you can cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
> 
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you need.
> 
> At first look, no I don't think this Spark-side workaround for naming
> for your use case is worthwhile. There are existing better solutions.
> 
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack <m...@enrico.minack.dev> wrote:
>> 
>> Hi all,
>> 
>> Running expensive deterministic UDFs that return complex types, followed by 
>> multiple references to those results cause Spark to evaluate the UDF 
>> multiple times per row. This has been reported and discussed before: 
>> SPARK-18748 SPARK-17728
>> 
>>    val f: Int => Array[Int]
>>    val udfF = udf(f)
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>> 
>> A common approach to make Spark evaluate the UDF only once is to cache the 
>> intermediate result right after projecting the UDF:
>> 
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .cache()
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>> 
>> There are scenarios where this intermediate result is too big for the 
>> cluster to cache. Also this is bad design.
>> 
>> The best approach is to mark the UDF as non-deterministic. Then Spark 
>> optimizes the query in a way that the UDF gets called only once per row, 
>> exactly what you want.
>> 
>>    val udfF = udf(f).asNondeterministic()
>> 
>> However, stating a UDF is non-deterministic though it clearly is 
>> deterministic is counter-intuitive and makes your code harder to read.
>> 
>> Spark should provide a better way to flag the UDF. Calling it expensive 
>> would be a better naming here.
>> 
>>    val udfF = udf(f).asExpensive()
>> 
>> I understand that deterministic is a notion that Expression provides, and 
>> there is no equivalent to expensive that is understood by the optimizer. 
>> However, that asExpensive() could just set the ScalaUDF.udfDeterministic = 
>> deterministic && !expensive, which implements the best available approach 
>> behind a better naming.
>> 
>> What are your thoughts on asExpensive()?
>> 
>> Regards,
>> Enrico
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to