[ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Svyatoslav Semenyuk updated SPARK-43514:
----------------------------------------
    Description: 
We designed a function that joins two DFs on common column with some 
similarity. All next code will be on Scala 2.12.

I've added {{show}} calls for demonstration purposes.

{code:scala}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
RegexTokenizer, MinHashLSHModel}
import org.apache.spark.sql.{DataFrame, Column}

/**
 * Joins two dataframes on a string column using LSH algorithm for similarity 
computation.
 *
 * The output dataframe has three columns:
 *
 *   - `datasetA`, data from the left dataframe.
 *   - `datasetB`, data from the right dataframe.
 *   - `distance`, abstract distance between values of `joinColumn` from 
datasets;
 *     the lower `distance` value, the more similar `joinColumn` values are.
 */
def similarityJoin(
  leftDf: DataFrame,
  rightDf: DataFrame,
  joinColumn: String,
  threshold: Double = 0.8,
): DataFrame = {
  leftDf.show(false)
  rightDf.show(false)

  val pipeline = new Pipeline().setStages(Array(
    new RegexTokenizer()
      .setPattern("")
      .setMinTokenLength(1)
      .setInputCol(joinColumn)
      .setOutputCol("tokens"),
    new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
    new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
    new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
  ))

  val model = pipeline.fit(leftDf)

  val storedHashed = model.transform(leftDf)
  val landedHashed = model.transform(rightDf)

  def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as c

  val result = model
    .stages
    .last
    .asInstanceOf[MinHashLSHModel]
    .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
    .withColumn("datasetA", 
struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
    .withColumn("datasetB", 
struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))

  result.show(false)

  result
}
{code}


Now consider such simple example:

{code:scala}
val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"

similarityJoin(inputDF1, inputDF2, "name", 0.6)
{code}

This example runs with no errors and outputs 3 empty DFs. Let's add 
{{distinct}} method to one data frame:

{code:scala}
val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
2) as "df1"
val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"

similarityJoin(inputDF1, inputDF2, "name", 0.6)
{code}

This example outputs two empty DFs and then fails at {{result.show(false)}}. 
Error:

{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
defined function (LSHModel$$Lambda$3769/0x0000000101804840: 
(struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => 
array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>).
  ... many elided
Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at 
least 1 non zero entry.
  at scala.Predef$.require(Predef.scala:281)
  at 
org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
  at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
  ... many more
{code}

----

Now let's take a look on the example which is close to our application code. 
Define some helper functions:

{code:scala}
import org.apache.spark.sql.functions._


def process1(df: DataFrame): Unit = {
    val companies = df.select($"id", $"name")

    val directors = df
            .select(explode($"directors"))
            .select($"col.name", $"col.id")
            .dropDuplicates("id")

    val toBeMatched1 = companies
            .filter(length($"name") > 2)
            .select(
                $"name",
                $"id" as "sourceLegalEntityId",
            )

    val toBeMatched2 = directors
            .filter(length($"name") > 2)
            .select(
                $"name",
                $"id" as "directorId",
            )

    similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
}

def process2(df: DataFrame): Unit = {
    def process_financials(column: Column): Column = {
        transform(
            column,
            x => x.withField("date", to_timestamp(x("date"), "dd MMM yyyy")),
        )
    }

    val companies = df.select(
        $"id",
        $"name",
        struct(
            process_financials($"financials.balanceSheet") as "balanceSheet",
            process_financials($"financials.capitalAndReserves") as 
"capitalAndReserves",
        ) as "financials",
    )

    val directors = df
            .select(explode($"directors"))
            .select($"col.name", $"col.id")
            .dropDuplicates("id")

    val toBeMatched1 = companies
            .filter(length($"name") > 2)
            .select(
                $"name",
                $"id" as "sourceLegalEntityId",
            )

    val toBeMatched2 = directors
            .filter(length($"name") > 2)
            .select(
                $"name",
                $"id" as "directorId",
            )

    similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
}
{code}

Function {{process2}} does the same job as {{process1}}, but also does some 
transforms on {{financials}} column before executing similarity join.

Example data frame and its schema:

{code:scala}
import org.apache.spark.sql.types._

val schema = StructType(
    Seq(
        StructField("id", StringType),
        StructField("name", StringType),
        StructField(
            "directors",
            ArrayType(
                StructType(Seq(StructField("id", StringType), 
StructField("name", StringType))),
                containsNull = true,
            ),
        ),
        StructField(
            "financials",
            StructType(
                Seq(
                    StructField(
                        "balanceSheet",
                        ArrayType(
                            StructType(Seq(
                                StructField("date", StringType),
                                StructField("value", StringType),
                            )
                            ),
                            containsNull = true,
                        ),
                    ),
                    StructField(
                        "capitalAndReserves",
                        ArrayType(
                            StructType(Seq(
                                StructField("date", StringType),
                                StructField("value", StringType),
                            )
                            ),
                            containsNull = true,
                        ),
                    ),
                ),
            ),
        ),
    )
)

val mainDF = (1 to 10)
        .toDF("data")
        .withColumn("data", lit(null) cast schema)
        .select("data.*")
{code}

This code just makes a data frame with 10 rows of null column casted to the 
specified schema.

Now let's pass {{mainDF}} to previously defined functions and observe results.

Example 1:
{code:scala}
process1(mainDF)
{code}
Outputs three empty DFs, no errors.

Example 2:
{code:scala}
process1(mainDF.distinct())
{code}
Outputs two empty DFs and then fails at {{result.show(false)}}. Error:

{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => 
array<string>).
  ... many elided
Caused by: java.lang.NullPointerException
  at 
org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
  ... many more
{code}

Example 3:
{code:scala}
process2(mainDF)
{code}
Outputs two empty DFs and then fails at {{result.show(false)}}. Error:

{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => 
array<string>).
  ... many elided
Caused by: java.lang.NullPointerException
  at 
org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
  ... many more
{code}

Somehow presence of {{distinct}} DF method or {{transform}} (or 
{{to_timestamp}}) SQL function before executing similarity join causes it to 
fail on empty input data frames. If these operations are done after join, then 
no errors are emitted.

---

In the examples above I used trivia data frames for testing, and by design of 
these examples {{similarityJoin}} gets empty DFs. They are empty because join 
column is preventively cleared from null values by {{filter(length($"name") > 
2)}}. We had the same issue with {{RegexTokenizer}} raising 
{{NullPointerException}} with the real data where data frames were not empty 
after identical filter statement. This is really strange to get 
{{NullPointerException}} for DFs which do not have null values in join column.

Current workaround: -call {{distinct}} DF method and {{transform}} SQL function 
after similarity join.- apparently, adding `.cache()` to the source DF resolves 
the issue.

  was:
We designed a function that joins two DFs on common column with some 
similarity. All next code will be on Scala 2.12.

I've added {{show}} calls for demonstration purposes.

{code:scala}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
RegexTokenizer, MinHashLSHModel}
import org.apache.spark.sql.{DataFrame, Column}

/**
 * Joins two data frames on a string column using LSH algorithm
 * for similarity computation.
 *
 * If input data frames have columns with identical names,
 * the resulting dataframe will have columns from them both
 * with prefixes `datasetA` and `datasetB` respectively.
 *
 * For example, if both dataframes have a column with name `myColumn`,
 * then the result will have columns `datasetAMyColumn` and `datasetBMyColumn`.
 */
def similarityJoin(
        df: DataFrame,
        anotherDf: DataFrame,
        joinExpr: String,
        threshold: Double = 0.8,
): DataFrame = {
    df.show(false)
    anotherDf.show(false)

    val pipeline = new Pipeline().setStages(Array(
        new RegexTokenizer()
                .setPattern("")
                .setMinTokenLength(1)
                .setInputCol(joinExpr)
                .setOutputCol("tokens"),
        new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
        new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
        new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
    )
    )

    val model = pipeline.fit(df)

    val storedHashed = model.transform(df)
    val landedHashed = model.transform(anotherDf)

    val commonColumns = df.columns.toSet & anotherDf.columns.toSet

    /**
     * Converts column name from a data frame to the column of resulting 
dataset.
     */
    def convertColumn(datasetName: String)(columnName: String): Column = {
        val newName =
            if (commonColumns.contains(columnName)) 
s"$datasetName${columnName.capitalize}"
            else columnName

        col(s"$datasetName.$columnName") as newName
    }

    val columnsToSelect = df.columns.map(convertColumn("datasetA")) ++
                          anotherDf.columns.map(convertColumn("datasetB"))

    val result = model
            .stages
            .last
            .asInstanceOf[MinHashLSHModel]
            .approxSimilarityJoin(storedHashed, landedHashed, threshold, 
"confidence")
            .select(columnsToSelect.toSeq: _*)

    result.show(false)

    result
}
{code}


Now consider such simple example:

{code:scala}
val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"

similarityJoin(inputDF1, inputDF2, "name", 0.6)
{code}

This example runs with no errors and outputs 3 empty DFs. Let's add 
{{distinct}} method to one data frame:

{code:scala}
val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
2) as "df1"
val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"

similarityJoin(inputDF1, inputDF2, "name", 0.6)
{code}

This example outputs two empty DFs and then fails at {{result.show(false)}}. 
Error:

{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
defined function (LSHModel$$Lambda$3769/0x0000000101804840: 
(struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => 
array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>).
  ... many elided
Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at 
least 1 non zero entry.
  at scala.Predef$.require(Predef.scala:281)
  at 
org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
  at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
  ... many more
{code}

----

Now let's take a look on the example which is close to our application code. 
Define some helper functions:

{code:scala}
import org.apache.spark.sql.functions._


def process1(df: DataFrame): Unit = {
    val companies = df.select($"id", $"name")

    val directors = df
            .select(explode($"directors"))
            .select($"col.name", $"col.id")
            .dropDuplicates("id")

    val toBeMatched1 = companies
            .filter(length($"name") > 2)
            .select(
                $"name",
                $"id" as "sourceLegalEntityId",
            )

    val toBeMatched2 = directors
            .filter(length($"name") > 2)
            .select(
                $"name",
                $"id" as "directorId",
            )

    similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
}

def process2(df: DataFrame): Unit = {
    def process_financials(column: Column): Column = {
        transform(
            column,
            x => x.withField("date", to_timestamp(x("date"), "dd MMM yyyy")),
        )
    }

    val companies = df.select(
        $"id",
        $"name",
        struct(
            process_financials($"financials.balanceSheet") as "balanceSheet",
            process_financials($"financials.capitalAndReserves") as 
"capitalAndReserves",
        ) as "financials",
    )

    val directors = df
            .select(explode($"directors"))
            .select($"col.name", $"col.id")
            .dropDuplicates("id")

    val toBeMatched1 = companies
            .filter(length($"name") > 2)
            .select(
                $"name",
                $"id" as "sourceLegalEntityId",
            )

    val toBeMatched2 = directors
            .filter(length($"name") > 2)
            .select(
                $"name",
                $"id" as "directorId",
            )

    similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
}
{code}

Function {{process2}} does the same job as {{process1}}, but also does some 
transforms on {{financials}} column before executing similarity join.

Example data frame and its schema:

{code:scala}
import org.apache.spark.sql.types._

val schema = StructType(
    Seq(
        StructField("id", StringType),
        StructField("name", StringType),
        StructField(
            "directors",
            ArrayType(
                StructType(Seq(StructField("id", StringType), 
StructField("name", StringType))),
                containsNull = true,
            ),
        ),
        StructField(
            "financials",
            StructType(
                Seq(
                    StructField(
                        "balanceSheet",
                        ArrayType(
                            StructType(Seq(
                                StructField("date", StringType),
                                StructField("value", StringType),
                            )
                            ),
                            containsNull = true,
                        ),
                    ),
                    StructField(
                        "capitalAndReserves",
                        ArrayType(
                            StructType(Seq(
                                StructField("date", StringType),
                                StructField("value", StringType),
                            )
                            ),
                            containsNull = true,
                        ),
                    ),
                ),
            ),
        ),
    )
)

val mainDF = (1 to 10)
        .toDF("data")
        .withColumn("data", lit(null) cast schema)
        .select("data.*")
{code}

This code just makes a data frame with 10 rows of null column casted to the 
specified schema.

Now let's pass {{mainDF}} to previously defined functions and observe results.

Example 1:
{code:scala}
process1(mainDF)
{code}
Outputs three empty DFs, no errors.

Example 2:
{code:scala}
process1(mainDF.distinct())
{code}
Outputs two empty DFs and then fails at {{result.show(false)}}. Error:

{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => 
array<string>).
  ... many elided
Caused by: java.lang.NullPointerException
  at 
org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
  ... many more
{code}

Example 3:
{code:scala}
process2(mainDF)
{code}
Outputs two empty DFs and then fails at {{result.show(false)}}. Error:

{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => 
array<string>).
  ... many elided
Caused by: java.lang.NullPointerException
  at 
org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
  ... many more
{code}

Somehow presence of {{distinct}} DF method or {{transform}} (or 
{{to_timestamp}}) SQL function before executing similarity join causes it to 
fail on empty input data frames. If these operations are done after join, then 
no errors are emitted.

---

In the examples above I used trivia data frames for testing, and by design of 
these examples {{similarityJoin}} gets empty DFs. They are empty because join 
column is preventively cleared from null values by {{filter(length($"name") > 
2)}}. We had the same issue with {{RegexTokenizer}} raising 
{{NullPointerException}} with the real data where data frames were not empty 
after identical filter statement. This is really strange to get 
{{NullPointerException}} for DFs which do not have null values in join column.

Current workaround: -call {{distinct}} DF method and {{transform}} SQL function 
after similarity join.- apparently, adding `.cache()` to the source DF resolves 
the issue.


> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-43514
>                 URL: https://issues.apache.org/jira/browse/SPARK-43514
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, SQL
>    Affects Versions: 3.3.2, 3.4.0
>         Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>            Reporter: Svyatoslav Semenyuk
>            Priority: Major
>              Labels: ml, sql
>         Attachments: 2023-05-30 13-47-04.mp4, Screen Shot 2023-05-22 at 
> 5.39.55 PM.png, Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  *     the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
>     new RegexTokenizer()
>       .setPattern("")
>       .setMinTokenLength(1)
>       .setInputCol(joinColumn)
>       .setOutputCol("tokens"),
>     new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
>     new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
>     new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
>     .stages
>     .last
>     .asInstanceOf[MinHashLSHModel]
>     .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
>     .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
>     .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x0000000101804840: 
> (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => 
> array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> ----
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
>     val companies = df.select($"id", $"name")
>     val directors = df
>             .select(explode($"directors"))
>             .select($"col.name", $"col.id")
>             .dropDuplicates("id")
>     val toBeMatched1 = companies
>             .filter(length($"name") > 2)
>             .select(
>                 $"name",
>                 $"id" as "sourceLegalEntityId",
>             )
>     val toBeMatched2 = directors
>             .filter(length($"name") > 2)
>             .select(
>                 $"name",
>                 $"id" as "directorId",
>             )
>     similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
> }
> def process2(df: DataFrame): Unit = {
>     def process_financials(column: Column): Column = {
>         transform(
>             column,
>             x => x.withField("date", to_timestamp(x("date"), "dd MMM yyyy")),
>         )
>     }
>     val companies = df.select(
>         $"id",
>         $"name",
>         struct(
>             process_financials($"financials.balanceSheet") as "balanceSheet",
>             process_financials($"financials.capitalAndReserves") as 
> "capitalAndReserves",
>         ) as "financials",
>     )
>     val directors = df
>             .select(explode($"directors"))
>             .select($"col.name", $"col.id")
>             .dropDuplicates("id")
>     val toBeMatched1 = companies
>             .filter(length($"name") > 2)
>             .select(
>                 $"name",
>                 $"id" as "sourceLegalEntityId",
>             )
>     val toBeMatched2 = directors
>             .filter(length($"name") > 2)
>             .select(
>                 $"name",
>                 $"id" as "directorId",
>             )
>     similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
> }
> {code}
> Function {{process2}} does the same job as {{process1}}, but also does some 
> transforms on {{financials}} column before executing similarity join.
> Example data frame and its schema:
> {code:scala}
> import org.apache.spark.sql.types._
> val schema = StructType(
>     Seq(
>         StructField("id", StringType),
>         StructField("name", StringType),
>         StructField(
>             "directors",
>             ArrayType(
>                 StructType(Seq(StructField("id", StringType), 
> StructField("name", StringType))),
>                 containsNull = true,
>             ),
>         ),
>         StructField(
>             "financials",
>             StructType(
>                 Seq(
>                     StructField(
>                         "balanceSheet",
>                         ArrayType(
>                             StructType(Seq(
>                                 StructField("date", StringType),
>                                 StructField("value", StringType),
>                             )
>                             ),
>                             containsNull = true,
>                         ),
>                     ),
>                     StructField(
>                         "capitalAndReserves",
>                         ArrayType(
>                             StructType(Seq(
>                                 StructField("date", StringType),
>                                 StructField("value", StringType),
>                             )
>                             ),
>                             containsNull = true,
>                         ),
>                     ),
>                 ),
>             ),
>         ),
>     )
> )
> val mainDF = (1 to 10)
>         .toDF("data")
>         .withColumn("data", lit(null) cast schema)
>         .select("data.*")
> {code}
> This code just makes a data frame with 10 rows of null column casted to the 
> specified schema.
> Now let's pass {{mainDF}} to previously defined functions and observe results.
> Example 1:
> {code:scala}
> process1(mainDF)
> {code}
> Outputs three empty DFs, no errors.
> Example 2:
> {code:scala}
> process1(mainDF.distinct())
> {code}
> Outputs two empty DFs and then fails at {{result.show(false)}}. Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => 
> array<string>).
>   ... many elided
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
>   ... many more
> {code}
> Example 3:
> {code:scala}
> process2(mainDF)
> {code}
> Outputs two empty DFs and then fails at {{result.show(false)}}. Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => 
> array<string>).
>   ... many elided
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
>   ... many more
> {code}
> Somehow presence of {{distinct}} DF method or {{transform}} (or 
> {{to_timestamp}}) SQL function before executing similarity join causes it to 
> fail on empty input data frames. If these operations are done after join, 
> then no errors are emitted.
> ---
> In the examples above I used trivia data frames for testing, and by design of 
> these examples {{similarityJoin}} gets empty DFs. They are empty because join 
> column is preventively cleared from null values by {{filter(length($"name") > 
> 2)}}. We had the same issue with {{RegexTokenizer}} raising 
> {{NullPointerException}} with the real data where data frames were not empty 
> after identical filter statement. This is really strange to get 
> {{NullPointerException}} for DFs which do not have null values in join column.
> Current workaround: -call {{distinct}} DF method and {{transform}} SQL 
> function after similarity join.- apparently, adding `.cache()` to the source 
> DF resolves the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to