We really need some documents to define what non-deterministic means. AFAIK, non-deterministic expressions may produce a different result for the same input row, if the already processed input rows are different.
The optimizer tries its best to not change the input sequence of non-deterministic expressions. For example, `df.select(..., nonDeterministicExpr).filter...` can't do filter pushdown. An exception is filter condition. For `df.filter(nonDeterministic && cond)`, Spark still pushes down `cond` even if it may change the input sequence of the first condition. This is to respect the SQL semantic that filter conditions ANDed together are order-insensitive. Users should write ` df.filter(nonDeterministic).filter(cond)` to guarantee the order. For this particular problem, I think it's not only about UDF, but a general problem with how Spark collapses Projects. For example, `df.select('a * 5 as 'b).select('b + 2, 'b + 3)`, Spark optimizes it to `df.select('a * 5 + 2, 'a * 5 + 3)`, and execute 'a * 5 twice. I think we should revisit this optimization and think about when we can collapse. On Thu, Nov 7, 2019 at 6:20 PM Rubén Berenguel <rbereng...@gmail.com> wrote: > That was very interesting, thanks Enrico. > > Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases. > > Regards, > > Ruben > > > On 7 Nov 2019, at 11:09, Sean Owen <sro...@gmail.com> wrote: > > > > Interesting, what does non-deterministic do except have this effect? > > aside from the naming, it could be a fine use of this flag if that's > > all it effectively does. I'm not sure I'd introduce another flag with > > the same semantics just over naming. If anything 'expensive' also > > isn't the right word, more like 'try not to evaluate multiple times'. > > > > Why isn't caching the answer? I realize it's big, but you can cache to > > disk. This may be faster than whatever plan reordering has to happen > > to evaluate once. > > > > Usually I'd say, can you redesign your UDF and code to be more > > efficient too? or use a big a cluster if that's really what you need. > > > > At first look, no I don't think this Spark-side workaround for naming > > for your use case is worthwhile. There are existing better solutions. > > > > On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack <m...@enrico.minack.dev> > wrote: > >> > >> Hi all, > >> > >> Running expensive deterministic UDFs that return complex types, > followed by multiple references to those results cause Spark to evaluate > the UDF multiple times per row. This has been reported and discussed > before: SPARK-18748 SPARK-17728 > >> > >> val f: Int => Array[Int] > >> val udfF = udf(f) > >> df > >> .select($"id", udfF($"id").as("array")) > >> .select($"array"(0).as("array0"), $"array"(1).as("array1")) > >> > >> A common approach to make Spark evaluate the UDF only once is to cache > the intermediate result right after projecting the UDF: > >> > >> df > >> .select($"id", udfF($"id").as("array")) > >> .cache() > >> .select($"array"(0).as("array0"), $"array"(1).as("array1")) > >> > >> There are scenarios where this intermediate result is too big for the > cluster to cache. Also this is bad design. > >> > >> The best approach is to mark the UDF as non-deterministic. Then Spark > optimizes the query in a way that the UDF gets called only once per row, > exactly what you want. > >> > >> val udfF = udf(f).asNondeterministic() > >> > >> However, stating a UDF is non-deterministic though it clearly is > deterministic is counter-intuitive and makes your code harder to read. > >> > >> Spark should provide a better way to flag the UDF. Calling it expensive > would be a better naming here. > >> > >> val udfF = udf(f).asExpensive() > >> > >> I understand that deterministic is a notion that Expression provides, > and there is no equivalent to expensive that is understood by the > optimizer. However, that asExpensive() could just set the > ScalaUDF.udfDeterministic = deterministic && !expensive, which implements > the best available approach behind a better naming. > >> > >> What are your thoughts on asExpensive()? > >> > >> Regards, > >> Enrico > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > > > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >