[ https://issues.apache.org/jira/browse/SPARK-35291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Umar Asir updated SPARK-35291: ------------------------------ Description: We are trying to merge data using DeltaTable's merge API. on inserting a null value into the not-null column results in NullPointerException instead of throwing constrain violation error *Code :* {code:java} package com.rk.cdw.delta import org.apache.spark.sql._ import io.delta.tables._ object NotNullIssue { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "C:\\Tools\\hadoop\\") val spark = SparkSession .builder() .appName("DFMergeTest") .master("local[*]") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.testing.memory", "571859200") .getOrCreate() println("Reading from the source table") val df = spark.read.format("delta").load("C:\\Input\\cdwqasourceupdate") df.show() println("Reading from the target table") val tgtDf = spark.read.format("delta").load("C:\\Input\\cdwqatgtupdate") tgtDf.show() val sourceTable = "source" val targetDataTable = "target" val sourceFields = df.schema.fieldNames val targetFields = tgtDf.schema.fieldNames val colMap= scala.collection.mutable.Map[String,String]() for ( i <- 0 until targetFields.length) { colMap(targetFields(i)) = sourceTable + "." + sourceFields(i) } println("update") DeltaTable.forPath(spark, "C:\\Input\\cdwqatgtupdate") .as(targetDataTable) .merge( df.as(sourceTable), targetDataTable + "." + "TGTID" + " = " + sourceTable + "." + "c1_ID" ) .whenMatched() .updateExpr(colMap) .execute() println("Reading from target the table after operation") tgtDf.show() } } {code} {panel:title=Error } ^Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184) ... 17 more^ {panel} Please find the sample program [^NotNullIssue.scala] , source data from parquet files [^cdwqasourceupdate.7z] and target data [^cdwqatgtupdate.7z] Attaching log [^run1.log] was: We are trying to merge data using DeltaTable's merge API. on inserting a null value into the not-null column results in NullPointerException instead of throwing constrain violation error *Code :* {code:java} package com.rk.cdw.delta import org.apache.spark.sql._ import io.delta.tables._ object NotNullIssue { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "C:\\Tools\\hadoop\\") val spark = SparkSession .builder() .appName("DFMergeTest") .master("local[*]") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.testing.memory", "571859200") .getOrCreate() println("Reading from the source table") val df = spark.read.format("delta").load("C:\\Input\\cdwqasourceupdate") df.show() println("Reading from the target table") val tgtDf = spark.read.format("delta").load("C:\\Input\\cdwqatgtupdate") tgtDf.show() val sourceTable = "source" val targetDataTable = "target" val colMap= scala.collection.mutable.Map[String,String]() val sourceFields = df.schema.fieldNames val targetFields = tgtDf.schema.fieldNames for ( i <- 0 until targetFields.length) { colMap(targetFields(i)) = sourceTable + "." + sourceFields(i) } println("update") DeltaTable.forPath(spark, "C:\\Input\\cdwqatgtupdate") .as(targetDataTable) .merge( df.as(sourceTable), targetDataTable + "." + "TGTID" + " = " + sourceTable + "." + "c1_ID" ) .whenMatched() .updateExpr(colMap) .execute() println("Reading from target the table after operation") tgtDf.show() } } {code} {panel:title=Error } ^Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184) ... 17 more^ {panel} Please find the sample program [^NotNullIssue.scala] , source data from parquet files [^cdwqasourceupdate.7z] and target data [^cdwqatgtupdate.7z] Attaching log [^run1.log] > NullPointerException at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown > Source) > -------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-35291 > URL: https://issues.apache.org/jira/browse/SPARK-35291 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL > Affects Versions: 3.0.2 > Reporter: Umar Asir > Priority: Major > Attachments: NotNullIssue.scala, cdwqasourceupdate.7z, > cdwqatgtupdate.7z, run1.log > > > We are trying to merge data using DeltaTable's merge API. on inserting a null > value into the not-null column results in NullPointerException instead of > throwing constrain violation error > > *Code :* > {code:java} > package com.rk.cdw.delta > import org.apache.spark.sql._ > import io.delta.tables._ > object NotNullIssue { > def main(args: Array[String]): Unit = { > System.setProperty("hadoop.home.dir", "C:\\Tools\\hadoop\\") > val spark = SparkSession > .builder() > .appName("DFMergeTest") > .master("local[*]") > .config("spark.sql.extensions", > "io.delta.sql.DeltaSparkSessionExtension") > .config("spark.sql.catalog.spark_catalog", > "org.apache.spark.sql.delta.catalog.DeltaCatalog") > .config("spark.testing.memory", "571859200") > .getOrCreate() > println("Reading from the source table") > val df = spark.read.format("delta").load("C:\\Input\\cdwqasourceupdate") > df.show() > println("Reading from the target table") > val tgtDf = spark.read.format("delta").load("C:\\Input\\cdwqatgtupdate") > tgtDf.show() > val sourceTable = "source" > val targetDataTable = "target" > val sourceFields = df.schema.fieldNames > val targetFields = tgtDf.schema.fieldNames > val colMap= scala.collection.mutable.Map[String,String]() > for ( i <- 0 until targetFields.length) { > colMap(targetFields(i)) = sourceTable + "." + sourceFields(i) > } > println("update") > DeltaTable.forPath(spark, "C:\\Input\\cdwqatgtupdate") > .as(targetDataTable) > .merge( > df.as(sourceTable), > targetDataTable + "." + "TGTID" + " = " + sourceTable + "." + > "c1_ID" ) > .whenMatched() > .updateExpr(colMap) > .execute() > println("Reading from target the table after operation") > tgtDf.show() > } > } > {code} > {panel:title=Error } > ^Caused by: java.lang.NullPointerExceptionCaused by: > java.lang.NullPointerException at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown > Source) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown > Source) at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184) > ... 17 more^ > {panel} > > Please find the sample program [^NotNullIssue.scala] , source data from > parquet files [^cdwqasourceupdate.7z] and target data [^cdwqatgtupdate.7z] > > Attaching log [^run1.log] > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org