[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Bellchambers updated SPARK-22446:
--------------------------------------
    Summary: Optimizer causing StringIndexerModel's indexer UDF to throw 
"Unseen label" exception incorrectly for filtered data.  (was: Optimizer 
causing StringIndexer's indexer UDF to throw "Unseen label" exception 
incorrectly for filtered data.)

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22446
>                 URL: https://issues.apache.org/jira/browse/SPARK-22446
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, Optimizer
>    Affects Versions: 2.0.0, 2.2.0
>         Environment: spark-shell, local mode, macOS Sierra 10.12.6
>            Reporter: Greg Bellchambers
>            Priority: Normal
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
>     val indexer = udf { label: String =>
>       if (labelToIndex.contains(label)) {
>         labelToIndex(label)
>       } else {
>         throw new SparkException(s"Unseen label: $label.")
>       }
>     }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>      | ("A", "London", "StrA"),
>      | ("B", "Bristol", null),
>      | ("C", "New York", "StrC")
>      | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---+--------+-------+
> | ID|    CITY|CONTENT|
> +---+--------+-------+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---+--------+-------+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---+--------+-------+
> | ID|    CITY|CONTENT|
> +---+--------+-------+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---+--------+-------+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>      | new StringIndexer()
>      | .setInputCol("CITY")
>      | .setOutputCol("CITYIndexed")
>      | .fit(dfNoBristol)
>      | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa23333fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---+--------+-------+-----------+
> | ID|    CITY|CONTENT|CITYIndexed|
> +---+--------+-------+-----------+
> |  A|  London|   StrA|        0.0|
> |  C|New York|   StrC|        1.0|
> +---+--------+-------+-----------+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>       at org.apache.spark.scheduler.Task.run(Task.scala:108)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Unseen label: Bristol.  To handle 
> unseen labels, set Param handleInvalid to keep.
>       at 
> org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:222)
>       at 
> org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
>       ... 13 more
> {code}
> To understand what is happening here, note that an action is triggered when 
> we call `StringIndexer.fit()`, before the `CITYIndexed === 1` filter is 
> applied, so the StringIndexerModel sees only London and New York, as 
> expected. Now compare the query plans for `dfWithIndex` and 
> `dfWithIndex.filter($"CITYIndexed" === 1.0)`:
> {noformat}
> scala> dfWithIndex.explain
> == Physical Plan ==
> *Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS 
> CITYIndexed#159]
> +- *Filter isnotnull(_3#5)
>    +- LocalTableScan [_1#3, _2#4, _3#5]
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).explain
> == Physical Plan ==
> *Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS 
> CITYIndexed#159]
> +- *Filter (isnotnull(_3#5) && (UDF(_2#4) = 1.0))
>    +- LocalTableScan [_1#3, _2#4, _3#5]
> {noformat}
> Note that in the latter, the query plan has pushed the filter `$"CITYIndexed" 
> === 1.0` back to be performed at the same stage as our null filter (`Filter 
> (isnotnull(_3#5) && (UDF(_2#4) = 1.0))`).
> With a debugger I have seen that both operands of `&&` are executed on each 
> row of `df`: `isnotnull(_3#5)` and `UDF(_2#4) = 1.0`. Therefore, the UDF is 
> passed the label `Bristol` despite isnotnull returning false for that row.
> If we cache the DataFrame `dfNoBristol` immediately after creating it, then 
> there is no longer an error because the optimizer does not attempt to call 
> the UDF on unseen data. The fact that we get different results depending on 
> whether or not we call cache is a cause for concern.
> I have seen similar issues with pure SparkSql DataFrame operations when the 
> DAG gets complicated (many joins, and aggregations). These are harder to 
> isolate to such a simple example, but I plan to report them in the near 
> future.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to