[ https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730623#comment-17730623 ]
Ritika Maheshwari commented on SPARK-43514: ------------------------------------------- Please refer to Spark Properties section in the documentation to see how to set the excludeRule property on SparkConf https://spark.apache.org/docs/latest/configuration.html#spark-properties > Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML > features caused by certain SQL functions > ---------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-43514 > URL: https://issues.apache.org/jira/browse/SPARK-43514 > Project: Spark > Issue Type: Bug > Components: ML, SQL > Affects Versions: 3.3.2, 3.4.0 > Environment: Scala version: 2.12.17 > Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0. > Spark 3.3.2 deployed on cluster was used to check the issue on real data. > Reporter: Svyatoslav Semenyuk > Priority: Major > Labels: ml, sql > Attachments: 2023-05-30 13-47-04.mp4, Plan.png, Screen Shot > 2023-05-22 at 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, > Test.scala > > > We designed a function that joins two DFs on common column with some > similarity. All next code will be on Scala 2.12. > I've added {{show}} calls for demonstration purposes. > {code:scala} > import org.apache.spark.ml.Pipeline > import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, > RegexTokenizer, MinHashLSHModel} > import org.apache.spark.sql.{DataFrame, Column} > /** > * Joins two dataframes on a string column using LSH algorithm for similarity > computation. > * > * The output dataframe has three columns: > * > * - `datasetA`, data from the left dataframe. > * - `datasetB`, data from the right dataframe. > * - `distance`, abstract distance between values of `joinColumn` from > datasets; > * the lower `distance` value, the more similar `joinColumn` values are. > */ > def similarityJoin( > leftDf: DataFrame, > rightDf: DataFrame, > joinColumn: String, > threshold: Double = 0.8, > ): DataFrame = { > leftDf.show(false) > rightDf.show(false) > val pipeline = new Pipeline().setStages(Array( > new RegexTokenizer() > .setPattern("") > .setMinTokenLength(1) > .setInputCol(joinColumn) > .setOutputCol("tokens"), > new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"), > new HashingTF().setInputCol("ngrams").setOutputCol("vectors"), > new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"), > )) > val model = pipeline.fit(leftDf) > val storedHashed = model.transform(leftDf) > val landedHashed = model.transform(rightDf) > def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as > c > val result = model > .stages > .last > .asInstanceOf[MinHashLSHModel] > .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance") > .withColumn("datasetA", > struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*)) > .withColumn("datasetB", > struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*)) > result.show(false) > result > } > {code} > Now consider such simple example: > {code:scala} > val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1" > val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2" > similarityJoin(inputDF1, inputDF2, "name", 0.6) > {code} > This example runs with no errors and outputs 3 empty DFs. Let's add > {{distinct}} method to one data frame: > {code:scala} > val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > > 2) as "df1" > val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2" > similarityJoin(inputDF1, inputDF2, "name", 0.6) > {code} > This example outputs two empty DFs and then fails at {{result.show(false)}}. > Error: > {code:none} > org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user > defined function (LSHModel$$Lambda$3769/0x0000000101804840: > (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => > array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>). > ... many elided > Caused by: java.lang.IllegalArgumentException: requirement failed: Must have > at least 1 non zero entry. > at scala.Predef$.require(Predef.scala:281) > at > org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61) > at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99) > ... many more > {code} > ---- > Now let's take a look on the example which is close to our application code. > Define some helper functions: > {code:scala} > import org.apache.spark.sql.functions._ > def process1(df: DataFrame): Unit = { > val companies = df.select($"id", $"name") > val directors = df > .select(explode($"directors")) > .select($"col.name", $"col.id") > .dropDuplicates("id") > val toBeMatched1 = companies > .filter(length($"name") > 2) > .select( > $"name", > $"id" as "sourceLegalEntityId", > ) > val toBeMatched2 = directors > .filter(length($"name") > 2) > .select( > $"name", > $"id" as "directorId", > ) > similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6) > } > def process2(df: DataFrame): Unit = { > def process_financials(column: Column): Column = { > transform( > column, > x => x.withField("date", to_timestamp(x("date"), "dd MMM yyyy")), > ) > } > val companies = df.select( > $"id", > $"name", > struct( > process_financials($"financials.balanceSheet") as "balanceSheet", > process_financials($"financials.capitalAndReserves") as > "capitalAndReserves", > ) as "financials", > ) > val directors = df > .select(explode($"directors")) > .select($"col.name", $"col.id") > .dropDuplicates("id") > val toBeMatched1 = companies > .filter(length($"name") > 2) > .select( > $"name", > $"id" as "sourceLegalEntityId", > ) > val toBeMatched2 = directors > .filter(length($"name") > 2) > .select( > $"name", > $"id" as "directorId", > ) > similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6) > } > {code} > Function {{process2}} does the same job as {{process1}}, but also does some > transforms on {{financials}} column before executing similarity join. > Example data frame and its schema: > {code:scala} > import org.apache.spark.sql.types._ > val schema = StructType( > Seq( > StructField("id", StringType), > StructField("name", StringType), > StructField( > "directors", > ArrayType( > StructType(Seq(StructField("id", StringType), > StructField("name", StringType))), > containsNull = true, > ), > ), > StructField( > "financials", > StructType( > Seq( > StructField( > "balanceSheet", > ArrayType( > StructType(Seq( > StructField("date", StringType), > StructField("value", StringType), > ) > ), > containsNull = true, > ), > ), > StructField( > "capitalAndReserves", > ArrayType( > StructType(Seq( > StructField("date", StringType), > StructField("value", StringType), > ) > ), > containsNull = true, > ), > ), > ), > ), > ), > ) > ) > val mainDF = (1 to 10) > .toDF("data") > .withColumn("data", lit(null) cast schema) > .select("data.*") > {code} > This code just makes a data frame with 10 rows of null column casted to the > specified schema. > Now let's pass {{mainDF}} to previously defined functions and observe results. > Example 1: > {code:scala} > process1(mainDF) > {code} > Outputs three empty DFs, no errors. > Example 2: > {code:scala} > process1(mainDF.distinct()) > {code} > Outputs two empty DFs and then fails at {{result.show(false)}}. Error: > {code:none} > org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user > defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => > array<string>). > ... many elided > Caused by: java.lang.NullPointerException > at > org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146) > ... many more > {code} > Example 3: > {code:scala} > process2(mainDF) > {code} > Outputs two empty DFs and then fails at {{result.show(false)}}. Error: > {code:none} > org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user > defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => > array<string>). > ... many elided > Caused by: java.lang.NullPointerException > at > org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146) > ... many more > {code} > Somehow presence of {{distinct}} DF method or {{transform}} (or > {{to_timestamp}}) SQL function before executing similarity join causes it to > fail on empty input data frames. If these operations are done after join, > then no errors are emitted. > --- > In the examples above I used trivia data frames for testing, and by design of > these examples {{similarityJoin}} gets empty DFs. They are empty because join > column is preventively cleared from null values by {{filter(length($"name") > > 2)}}. We had the same issue with {{RegexTokenizer}} raising > {{NullPointerException}} with the real data where data frames were not empty > after identical filter statement. This is really strange to get > {{NullPointerException}} for DFs which do not have null values in join column. > Current workaround: -call {{distinct}} DF method and {{transform}} SQL > function after similarity join.- apparently, adding `.cache()` to the source > DF resolves the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org