[ 
https://issues.apache.org/jira/browse/SPARK-16164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15347701#comment-15347701
 ] 

Cheng Lian commented on SPARK-16164:
------------------------------------

I'm posting a summary of our offline and GitHub discussion about this issue 
here for future reference.

The case described in this ticket can be generalized as the following query 
plan fragment:

{noformat}
Filter p1
 Project ...
  Filter p2
   <child>
{noformat}

In this fragment, predicate {{p1}} is actually a partial function, which means 
it's not defined for all possible input values (for example, {{scala.math.log}} 
is only defined for positive numbers). Thus this query plan relies on the fact 
that {{p1}} is evaluated after {{p2}}, so that {{p2}} can reduce the range of 
input values for {{p1}}. However, filter push-down and {{CombineFilters}} 
optimizes this fragment into:

{noformat}
Project ...
 Filter p1 && p2
  <child>
{noformat}

which forces {{p1}} to be evaluated before {{p2}}. This causes the exception 
because now {{p1}} may receive invalid input values that are supposed to be 
filtered out by {{p2}}.

The problem here is that, the SQL optimizer should have the freedom to 
rearrange filter predicate evaluation order. For example, we may want to 
evaluate cheap predicates first to shortcut expensive predicates. However, to 
enable this kind of optimizations, essentially we require all filter predicates 
to be deterministic *full* functions, which is violated in the above case 
({{p1}} is not a full function).

[PR #13872|https://github.com/apache/spark/pull/13872] "fixes" the specific 
case mentioned in this ticket by adjusting optimization rule 
{{CombineFilters}}, which is safe. But in general, user applications should NOT 
make the assumption that filter predicates are always evaluated in the order 
they appear in the original query.


> CombineFilters should keep the ordering in the logical plan
> -----------------------------------------------------------
>
>                 Key: SPARK-16164
>                 URL: https://issues.apache.org/jira/browse/SPARK-16164
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Xiangrui Meng
>            Assignee: Dongjoon Hyun
>             Fix For: 2.0.1, 2.1.0
>
>
> [~cmccubbin] reported a bug when he used StringIndexer in an ML pipeline with 
> additional filters. It seems that during filter pushdown, we changed the 
> ordering in the logical plan. I'm not sure whether we should treat this as a 
> bug.
> {code}
> val df1 = (0 until 3).map(_.toString).toDF
> val indexer = new StringIndexer()
>   .setInputCol("value")
>   .setOutputCol("idx")
>   .setHandleInvalid("skip")
>   .fit(df1)
> val df2 = (0 until 5).map(_.toString).toDF
> val predictions = indexer.transform(df2)
> predictions.show() // this is okay
> predictions.where('idx > 2).show() // this will throw an exception
> {code}
> Please see the notebook at 
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1233855/2159162931615821/588180/latest.html
>  for error messages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to