That was very interesting, thanks Enrico. Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.
Regards, Ruben > On 7 Nov 2019, at 11:09, Sean Owen <sro...@gmail.com> wrote: > > Interesting, what does non-deterministic do except have this effect? > aside from the naming, it could be a fine use of this flag if that's > all it effectively does. I'm not sure I'd introduce another flag with > the same semantics just over naming. If anything 'expensive' also > isn't the right word, more like 'try not to evaluate multiple times'. > > Why isn't caching the answer? I realize it's big, but you can cache to > disk. This may be faster than whatever plan reordering has to happen > to evaluate once. > > Usually I'd say, can you redesign your UDF and code to be more > efficient too? or use a big a cluster if that's really what you need. > > At first look, no I don't think this Spark-side workaround for naming > for your use case is worthwhile. There are existing better solutions. > > On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack <m...@enrico.minack.dev> wrote: >> >> Hi all, >> >> Running expensive deterministic UDFs that return complex types, followed by >> multiple references to those results cause Spark to evaluate the UDF >> multiple times per row. This has been reported and discussed before: >> SPARK-18748 SPARK-17728 >> >> val f: Int => Array[Int] >> val udfF = udf(f) >> df >> .select($"id", udfF($"id").as("array")) >> .select($"array"(0).as("array0"), $"array"(1).as("array1")) >> >> A common approach to make Spark evaluate the UDF only once is to cache the >> intermediate result right after projecting the UDF: >> >> df >> .select($"id", udfF($"id").as("array")) >> .cache() >> .select($"array"(0).as("array0"), $"array"(1).as("array1")) >> >> There are scenarios where this intermediate result is too big for the >> cluster to cache. Also this is bad design. >> >> The best approach is to mark the UDF as non-deterministic. Then Spark >> optimizes the query in a way that the UDF gets called only once per row, >> exactly what you want. >> >> val udfF = udf(f).asNondeterministic() >> >> However, stating a UDF is non-deterministic though it clearly is >> deterministic is counter-intuitive and makes your code harder to read. >> >> Spark should provide a better way to flag the UDF. Calling it expensive >> would be a better naming here. >> >> val udfF = udf(f).asExpensive() >> >> I understand that deterministic is a notion that Expression provides, and >> there is no equivalent to expensive that is understood by the optimizer. >> However, that asExpensive() could just set the ScalaUDF.udfDeterministic = >> deterministic && !expensive, which implements the best available approach >> behind a better naming. >> >> What are your thoughts on asExpensive()? >> >> Regards, >> Enrico > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org