pratyakshsharma commented on a change in pull request #4227:
URL: https://github.com/apache/carbondata/pull/4227#discussion_r732206905
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
##########
@@ -847,20 +820,222 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
Row("j", 2, "RUSSIA"), Row("k", 0, "INDIA")))
}
- test("test all the merge APIs UPDATE, DELETE, UPSERT and INSERT") {
+ def prepareTarget(
+ isPartitioned: Boolean = false,
+ partitionedColumn: String = null
+ ): Dataset[Row] = {
sql("drop table if exists target")
- val initframe = sqlContext.sparkSession.createDataFrame(Seq(
+ val initFrame = sqlContext.sparkSession.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType),
StructField("value", StringType))))
- initframe.write
- .format("carbondata")
- .option("tableName", "target")
- .mode(SaveMode.Overwrite)
- .save()
- val target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+
+ if (isPartitioned) {
+ initFrame.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .option("partitionColumns", partitionedColumn)
+ .mode(SaveMode.Overwrite)
+ .save()
+ } else {
+ initFrame.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+ sqlContext.read.format("carbondata").option("tableName", "target").load()
+ }
+
+ def prepareTargetWithThreeFields(
+ isPartitioned: Boolean = false,
+ partitionedColumn: String = null
+ ): Dataset[Row] = {
+ sql("drop table if exists target")
+ val initFrame = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 0, "CHINA"),
+ Row("b", 1, "INDIA"),
+ Row("c", 2, "INDIA"),
+ Row("d", 3, "US")
+ ).asJava,
+ StructType(Seq(StructField("key", StringType),
+ StructField("value", IntegerType),
+ StructField("country", StringType))))
+
+ if (isPartitioned) {
+ initFrame.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .option("partitionColumns", partitionedColumn)
+ .mode(SaveMode.Overwrite)
+ .save()
+ } else {
+ initFrame.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+ sqlContext.read.format("carbondata").option("tableName", "target").load()
+ }
+
+ test("test schema enforcement") {
+ val target = prepareTarget()
+ var cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", "1", "ab"),
+ Row("d", "4", "de")
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", StringType)
+ , StructField("new_value", StringType))))
+ val properties = CarbonProperties.getInstance()
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false"
+ )
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "true"
+ )
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ checkAnswer(sql("select * from target"),
+ Seq(Row("a", "1"), Row("b", "1"), Row("c", "2"), Row("d", "4")))
+
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "true"
+ )
+
+ val exceptionCaught1 = intercept[MalformedCarbonCommandException] {
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 1, "ab"),
+ Row("d", 4, "de")
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", IntegerType)
+ , StructField("new_value", StringType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ }
+ assert(exceptionCaught1.getMessage
+ .contains(
+ "property CARBON_STREAMER_INSERT_DEDUPLICATE should " +
+ "only be set with operation type INSERT"))
+
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false"
+ )
+ val exceptionCaught2 = intercept[CarbonSchemaException] {
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 1),
+ Row("d", 4)
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("val", IntegerType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ }
+ assert(exceptionCaught2.getMessage.contains("source schema does not
contain field: value"))
+
+ val exceptionCaught3 = intercept[CarbonSchemaException] {
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 1),
+ Row("d", 4)
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", LongType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ }
+
+ assert(exceptionCaught3.getMsg.contains("source schema has different " +
+ "data type for field: value"))
+
+ val exceptionCaught4 = intercept[CarbonSchemaException] {
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", "1", "A"),
+ Row("d", "4", "D")
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", StringType), StructField("Key", StringType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ }
+
+ assert(exceptionCaught4.getMsg.contains("source schema has similar fields
which " +
+ "differ only in case sensitivity:
key"))
+ }
+
+ test("test schema evolution") {
+ val properties = CarbonProperties.getInstance()
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false"
+ )
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "false"
+ )
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD, "value"
+ )
+ sql("drop table if exists target")
+ var target = prepareTargetWithThreeFields()
+ var cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 1, "ab", "china"),
+ Row("d", 4, "de", "china"),
+ Row("d", 7, "updated_de", "china_pro")
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", IntegerType)
+ , StructField("new_value", StringType),
+ StructField("country", StringType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ checkAnswer(sql("select * from target"),
+ Seq(Row("a", 1, "china", "ab"), Row("b", 1, "INDIA", null),
+ Row("c", 2, "INDIA", null), Row("d", 7, "china_pro", "updated_de")))
+
+ target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 5),
+ Row("d", 5)
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", IntegerType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ checkAnswer(sql("select * from target"),
+ Seq(Row("a", 5), Row("b", 1),
+ Row("c", 2), Row("d", 5)))
+
+// target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+// cdc = sqlContext.sparkSession.createDataFrame(Seq(
+// Row("b", 50),
+// Row("d", 50)
Review comment:
Need to add a test case for data type change here. Will do that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]