[ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730287#comment-17730287
 ] 

Ritika Maheshwari commented on SPARK-43514:
-------------------------------------------

I had deleted the comment because it was incorrect. I could recreate the issue 
in 3.3.0 as well. Once we exclude the ConvertToLocalRelation rule it works both 
in 3.3.0 and 3.4.0.

By excluding this rule the Filter clause does not go through optimization rules 
and it was the Filter clause that was throwing the Null pointer exception.

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-43514
>                 URL: https://issues.apache.org/jira/browse/SPARK-43514
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, SQL
>    Affects Versions: 3.3.2, 3.4.0
>         Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>            Reporter: Svyatoslav Semenyuk
>            Priority: Major
>              Labels: ml, sql
>         Attachments: 2023-05-30 13-47-04.mp4, Plan.png, Screen Shot 
> 2023-05-22 at 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, 
> Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  *     the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
>     new RegexTokenizer()
>       .setPattern("")
>       .setMinTokenLength(1)
>       .setInputCol(joinColumn)
>       .setOutputCol("tokens"),
>     new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
>     new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
>     new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
>     .stages
>     .last
>     .asInstanceOf[MinHashLSHModel]
>     .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
>     .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
>     .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x0000000101804840: 
> (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => 
> array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> ----
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
>     val companies = df.select($"id", $"name")
>     val directors = df
>             .select(explode($"directors"))
>             .select($"col.name", $"col.id")
>             .dropDuplicates("id")
>     val toBeMatched1 = companies
>             .filter(length($"name") > 2)
>             .select(
>                 $"name",
>                 $"id" as "sourceLegalEntityId",
>             )
>     val toBeMatched2 = directors
>             .filter(length($"name") > 2)
>             .select(
>                 $"name",
>                 $"id" as "directorId",
>             )
>     similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
> }
> def process2(df: DataFrame): Unit = {
>     def process_financials(column: Column): Column = {
>         transform(
>             column,
>             x => x.withField("date", to_timestamp(x("date"), "dd MMM yyyy")),
>         )
>     }
>     val companies = df.select(
>         $"id",
>         $"name",
>         struct(
>             process_financials($"financials.balanceSheet") as "balanceSheet",
>             process_financials($"financials.capitalAndReserves") as 
> "capitalAndReserves",
>         ) as "financials",
>     )
>     val directors = df
>             .select(explode($"directors"))
>             .select($"col.name", $"col.id")
>             .dropDuplicates("id")
>     val toBeMatched1 = companies
>             .filter(length($"name") > 2)
>             .select(
>                 $"name",
>                 $"id" as "sourceLegalEntityId",
>             )
>     val toBeMatched2 = directors
>             .filter(length($"name") > 2)
>             .select(
>                 $"name",
>                 $"id" as "directorId",
>             )
>     similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
> }
> {code}
> Function {{process2}} does the same job as {{process1}}, but also does some 
> transforms on {{financials}} column before executing similarity join.
> Example data frame and its schema:
> {code:scala}
> import org.apache.spark.sql.types._
> val schema = StructType(
>     Seq(
>         StructField("id", StringType),
>         StructField("name", StringType),
>         StructField(
>             "directors",
>             ArrayType(
>                 StructType(Seq(StructField("id", StringType), 
> StructField("name", StringType))),
>                 containsNull = true,
>             ),
>         ),
>         StructField(
>             "financials",
>             StructType(
>                 Seq(
>                     StructField(
>                         "balanceSheet",
>                         ArrayType(
>                             StructType(Seq(
>                                 StructField("date", StringType),
>                                 StructField("value", StringType),
>                             )
>                             ),
>                             containsNull = true,
>                         ),
>                     ),
>                     StructField(
>                         "capitalAndReserves",
>                         ArrayType(
>                             StructType(Seq(
>                                 StructField("date", StringType),
>                                 StructField("value", StringType),
>                             )
>                             ),
>                             containsNull = true,
>                         ),
>                     ),
>                 ),
>             ),
>         ),
>     )
> )
> val mainDF = (1 to 10)
>         .toDF("data")
>         .withColumn("data", lit(null) cast schema)
>         .select("data.*")
> {code}
> This code just makes a data frame with 10 rows of null column casted to the 
> specified schema.
> Now let's pass {{mainDF}} to previously defined functions and observe results.
> Example 1:
> {code:scala}
> process1(mainDF)
> {code}
> Outputs three empty DFs, no errors.
> Example 2:
> {code:scala}
> process1(mainDF.distinct())
> {code}
> Outputs two empty DFs and then fails at {{result.show(false)}}. Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => 
> array<string>).
>   ... many elided
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
>   ... many more
> {code}
> Example 3:
> {code:scala}
> process2(mainDF)
> {code}
> Outputs two empty DFs and then fails at {{result.show(false)}}. Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => 
> array<string>).
>   ... many elided
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
>   ... many more
> {code}
> Somehow presence of {{distinct}} DF method or {{transform}} (or 
> {{to_timestamp}}) SQL function before executing similarity join causes it to 
> fail on empty input data frames. If these operations are done after join, 
> then no errors are emitted.
> ---
> In the examples above I used trivia data frames for testing, and by design of 
> these examples {{similarityJoin}} gets empty DFs. They are empty because join 
> column is preventively cleared from null values by {{filter(length($"name") > 
> 2)}}. We had the same issue with {{RegexTokenizer}} raising 
> {{NullPointerException}} with the real data where data frames were not empty 
> after identical filter statement. This is really strange to get 
> {{NullPointerException}} for DFs which do not have null values in join column.
> Current workaround: -call {{distinct}} DF method and {{transform}} SQL 
> function after similarity join.- apparently, adding `.cache()` to the source 
> DF resolves the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to