[ 
https://issues.apache.org/jira/browse/SPARK-35291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Umar Asir updated SPARK-35291:
------------------------------
    Description: 
We are trying to merge data using DeltaTable's merge API. On inserting a null 
value into the not-null column results in NullPointerException instead of 
throwing constrain violation error

 

*Code :*  
{code:java}
package com.uasir.cdw.delta
import org.apache.spark.sql._
import io.delta.tables._

object NotNullIssue {

  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir", "C:\\Tools\\hadoop\\")

    val spark = SparkSession
        .builder()
        .appName("DFMergeTest")
        .master("local[*]")
        .config("spark.sql.extensions", 
"io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.testing.memory", "571859200")
        .getOrCreate()

    println("Reading from the source table")
    val df = spark.read.format("delta").load("C:\\Input\\cdwqasourceupdate") 
\\PFA cdwqasourceupdate.7z
    df.show()

    println("Reading from the target table")
    val tgtDf = spark.read.format("delta").load("C:\\Input\\cdwqatgtupdate") 
\\PFA cdwqatgtupdate.7z
    tgtDf.show()

    val sourceTable = "source"
    val targetDataTable = "target"

    val colMap= scala.collection.mutable.Map[String,String]()

    val sourceFields = df.schema.fieldNames
    val targetFields = tgtDf.schema.fieldNames

    for ( i <- 0 until targetFields.length) {
      colMap(targetFields(i)) = sourceTable + "." + sourceFields(i)
    }

    /* colMap will be generated as :
    TGTID -> c1_ID
    TGT_NAME -> c2_NAME
    TGT_ADDRESS -> c3_address
    TGT_DOB -> c4_dob
    */

    println("update")
    DeltaTable.forPath(spark, "C:\\Input\\cdwqatgtupdate")
        .as(targetDataTable)
        .merge(
          df.as(sourceTable),
          targetDataTable + "." + "TGTID" + " = " + sourceTable + "." + "c1_ID" 
)
        .whenMatched()
        .updateExpr(colMap)
        .execute()
    println("Reading from target the table after operation")
    tgtDf.show()
  }

}

{code}
 

 

 

*Error*
{code:java}
Caused by: java.lang.RuntimeException: Error while decoding: 
java.lang.NullPointerExceptionCaused by: java.lang.RuntimeException: Error 
while decoding: java.lang.NullPointerExceptioncreateexternalrow(input[0, int, 
false], input[1, string, false].toString, input[2, string, true].toString, 
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
ObjectType(class java.sql.Date), toJavaDate, input[3, date, true], true, 
false), StructField(TGTID,IntegerType,false), 
StructField(TGT_NAME,StringType,false), 
StructField(TGT_ADDRESS,StringType,true), StructField(TGT_DOB,DateType,true)) 
at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188)
 at 
org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.$anonfun$processPartition$9(MergeIntoCommand.scala:565)
 at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at 
scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.run(Task.scala:127) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 

Caused by: java.lang.NullPointerException at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
 Source) at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
 ... 17 more
{code}
*PFA*
 # Sample program [^NotNullIssue.scala] ,
 # Source data (parquet  files) [^cdwqasourceupdate.7z] 
 # Target data (parquet  files) [^cdwqatgtupdate.7z]
 # Log [^run1.log]

 

 

  was:
We are trying to merge data using DeltaTable's merge API. On inserting a null 
value into the not-null column results in NullPointerException instead of 
throwing constrain violation error

 

*Code :*  
{code:java}
package com.uasir.cdw.delta
import org.apache.spark.sql._
import io.delta.tables._

object NotNullIssue {

  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir", "C:\\Tools\\hadoop\\")

    val spark = SparkSession
        .builder()
        .appName("DFMergeTest")
        .master("local[*]")
        .config("spark.sql.extensions", 
"io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.testing.memory", "571859200")
        .getOrCreate()

    println("Reading from the source table")
    val df = spark.read.format("delta").load("C:\\Input\\cdwqasourceupdate") 
\\PFA cdwqasourceupdate.7z
    df.show()

    println("Reading from the target table")
    val tgtDf = spark.read.format("delta").load("C:\\Input\\cdwqatgtupdate") 
\\PFA cdwqatgtupdate.7z
    tgtDf.show()

    val sourceTable = "source"
    val targetDataTable = "target"

    val colMap= scala.collection.mutable.Map[String,String]()

    val sourceFields = df.schema.fieldNames
    val targetFields = tgtDf.schema.fieldNames

    for ( i <- 0 until targetFields.length) {
      colMap(targetFields(i)) = sourceTable + "." + sourceFields(i)
    }

    /* colMap will be generated as :
    TGTID -> c1_ID
    TGT_NAME -> c2_NAME
    TGT_ADDRESS -> c3_address
    TGT_DOB -> c4_dob
    */

    println("update")
    DeltaTable.forPath(spark, "C:\\Input\\cdwqatgtupdate")
        .as(targetDataTable)
        .merge(
          df.as(sourceTable),
          targetDataTable + "." + "TGTID" + " = " + sourceTable + "." + "c1_ID" 
)
        .whenMatched()
        .updateExpr(colMap)
        .execute()
    println("Reading from target the table after operation")
    tgtDf.show()
  }

}

{code}
 

 

 

*Error*
{noformat}
Caused by: java.lang.RuntimeException: Error while decoding: 
java.lang.NullPointerExceptionCaused by: java.lang.RuntimeException: Error 
while decoding: java.lang.NullPointerExceptioncreateexternalrow(input[0, int, 
false], input[1, string, false].toString, input[2, string, true].toString, 
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
ObjectType(class java.sql.Date), toJavaDate, input[3, date, true], true, 
false), StructField(TGTID,IntegerType,false), 
StructField(TGT_NAME,StringType,false), 
StructField(TGT_ADDRESS,StringType,true), StructField(TGT_DOB,DateType,true)) 
at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188)
 at 
org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.$anonfun$processPartition$9(MergeIntoCommand.scala:565)
 at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at 
scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.run(Task.scala:127) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 

Caused by: java.lang.NullPointerException at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
 Source) at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
 ... 17 more{noformat}
 

*PFA*
 # Sample program [^NotNullIssue.scala] ,
 # Source data (parquet  files) [^cdwqasourceupdate.7z] 
 # Target data (parquet  files) [^cdwqatgtupdate.7z]
 # Log [^run1.log]

 

 


> NullPointerException at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown
>  Source)
> --------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-35291
>                 URL: https://issues.apache.org/jira/browse/SPARK-35291
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.0.2
>            Reporter: Umar Asir
>            Priority: Major
>         Attachments: NotNullIssue.scala, cdwqasourceupdate.7z, 
> cdwqatgtupdate.7z, run1.log
>
>
> We are trying to merge data using DeltaTable's merge API. On inserting a null 
> value into the not-null column results in NullPointerException instead of 
> throwing constrain violation error
>  
> *Code :*  
> {code:java}
> package com.uasir.cdw.delta
> import org.apache.spark.sql._
> import io.delta.tables._
> object NotNullIssue {
>   def main(args: Array[String]): Unit = {
>     System.setProperty("hadoop.home.dir", "C:\\Tools\\hadoop\\")
>     val spark = SparkSession
>         .builder()
>         .appName("DFMergeTest")
>         .master("local[*]")
>         .config("spark.sql.extensions", 
> "io.delta.sql.DeltaSparkSessionExtension")
>         .config("spark.sql.catalog.spark_catalog", 
> "org.apache.spark.sql.delta.catalog.DeltaCatalog")
>         .config("spark.testing.memory", "571859200")
>         .getOrCreate()
>     println("Reading from the source table")
>     val df = spark.read.format("delta").load("C:\\Input\\cdwqasourceupdate") 
> \\PFA cdwqasourceupdate.7z
>     df.show()
>     println("Reading from the target table")
>     val tgtDf = spark.read.format("delta").load("C:\\Input\\cdwqatgtupdate") 
> \\PFA cdwqatgtupdate.7z
>     tgtDf.show()
>     val sourceTable = "source"
>     val targetDataTable = "target"
>     val colMap= scala.collection.mutable.Map[String,String]()
>     val sourceFields = df.schema.fieldNames
>     val targetFields = tgtDf.schema.fieldNames
>     for ( i <- 0 until targetFields.length) {
>       colMap(targetFields(i)) = sourceTable + "." + sourceFields(i)
>     }
>     /* colMap will be generated as :
>     TGTID -> c1_ID
>     TGT_NAME -> c2_NAME
>     TGT_ADDRESS -> c3_address
>     TGT_DOB -> c4_dob
>     */
>     println("update")
>     DeltaTable.forPath(spark, "C:\\Input\\cdwqatgtupdate")
>         .as(targetDataTable)
>         .merge(
>           df.as(sourceTable),
>           targetDataTable + "." + "TGTID" + " = " + sourceTable + "." + 
> "c1_ID" )
>         .whenMatched()
>         .updateExpr(colMap)
>         .execute()
>     println("Reading from target the table after operation")
>     tgtDf.show()
>   }
> }
> {code}
>  
>  
>  
> *Error*
> {code:java}
> Caused by: java.lang.RuntimeException: Error while decoding: 
> java.lang.NullPointerExceptionCaused by: java.lang.RuntimeException: Error 
> while decoding: java.lang.NullPointerExceptioncreateexternalrow(input[0, int, 
> false], input[1, string, false].toString, input[2, string, true].toString, 
> staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
> ObjectType(class java.sql.Date), toJavaDate, input[3, date, true], true, 
> false), StructField(TGTID,IntegerType,false), 
> StructField(TGT_NAME,StringType,false), 
> StructField(TGT_ADDRESS,StringType,true), StructField(TGT_DOB,DateType,true)) 
> at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188)
>  at 
> org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.$anonfun$processPartition$9(MergeIntoCommand.scala:565)
>  at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at 
> scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source) at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
>  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.run(Task.scala:127) at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) 
> Caused by: java.lang.NullPointerException at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown
>  Source) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
>  Source) at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
>  ... 17 more
> {code}
> *PFA*
>  # Sample program [^NotNullIssue.scala] ,
>  # Source data (parquet  files) [^cdwqasourceupdate.7z] 
>  # Target data (parquet  files) [^cdwqatgtupdate.7z]
>  # Log [^run1.log]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to