[
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356340#comment-16356340
]
Joseph K. Bradley commented on SPARK-22446:
-------------------------------------------
[~viirya] Did you confirm this is an issue in Spark 2.2 or earlier?
> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label"
> exception incorrectly for filtered data.
> -------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
> Issue Type: Bug
> Components: ML, Optimizer
> Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
> Reporter: Greg Bellchambers
> Assignee: Liang-Chi Hsieh
> Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an
> "Unseen label" error, despite the label not being present in the transformed
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
> if (labelToIndex.contains(label)) {
> labelToIndex(label)
> } else {
> throw new SparkException(s"Unseen label: $label.")
> }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
> | ("A", "London", "StrA"),
> | ("B", "Bristol", null),
> | ("C", "New York", "StrC")
> | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more
> field]
> scala> df.show
> +---+--------+-------+
> | ID| CITY|CONTENT|
> +---+--------+-------+
> | A| London| StrA|
> | B| Bristol| null|
> | C|New York| StrC|
> +---+--------+-------+
> scala> // then we remove the row with null in CONTENT column, which removes
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID:
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---+--------+-------+
> | ID| CITY|CONTENT|
> +---+--------+-------+
> | A| London| StrA|
> | C|New York| StrC|
> +---+--------+-------+
> scala> // now create a StringIndexer for the CITY column and fit to
> dfNoBristol
> scala> val model = {
> | new StringIndexer()
> | .setInputCol("CITY")
> | .setOutputCol("CITYIndexed")
> | .fit(dfNoBristol)
> | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa23333fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2
> more fields]
> scala> dfWithIndex.show
> +---+--------+-------+-----------+
> | ID| CITY|CONTENT|CITYIndexed|
> +---+--------+-------+-----------+
> | A| London| StrA| 0.0|
> | C|New York| StrC| 1.0|
> +---+--------+-------+-----------+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed`
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method
> throws an exception reporting unseen label "Bristol". This is irrational
> behaviour as far as the user of the API is concerned, because there is no
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined
> function($anonfun$5: (string) => double)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Unseen label: Bristol. To handle
> unseen labels, set Param handleInvalid to keep.
> at
> org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:222)
> at
> org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
> ... 13 more
> {code}
> To understand what is happening here, note that an action is triggered when
> we call `StringIndexer.fit()`, before the `CITYIndexed === 1` filter is
> applied, so the StringIndexerModel sees only London and New York, as
> expected. Now compare the query plans for `dfWithIndex` and
> `dfWithIndex.filter($"CITYIndexed" === 1.0)`:
> {noformat}
> scala> dfWithIndex.explain
> == Physical Plan ==
> *Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS
> CITYIndexed#159]
> +- *Filter isnotnull(_3#5)
> +- LocalTableScan [_1#3, _2#4, _3#5]
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).explain
> == Physical Plan ==
> *Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS
> CITYIndexed#159]
> +- *Filter (isnotnull(_3#5) && (UDF(_2#4) = 1.0))
> +- LocalTableScan [_1#3, _2#4, _3#5]
> {noformat}
> Note that in the latter, the query plan has pushed the filter `$"CITYIndexed"
> === 1.0` back to be performed at the same stage as our null filter (`Filter
> (isnotnull(_3#5) && (UDF(_2#4) = 1.0))`).
> With a debugger I have seen that both operands of `&&` are executed on each
> row of `df`: `isnotnull(_3#5)` and `UDF(_2#4) = 1.0`. Therefore, the UDF is
> passed the label `Bristol` despite isnotnull returning false for that row.
> If we cache the DataFrame `dfNoBristol` immediately after creating it, then
> there is no longer an error because the optimizer does not attempt to call
> the UDF on unseen data. The fact that we get different results depending on
> whether or not we call cache is a cause for concern.
> I have seen similar issues with pure SparkSql DataFrame operations when the
> DAG gets complicated (many joins, and aggregations). These are harder to
> isolate to such a simple example, but I plan to report them in the near
> future.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]