yihua commented on code in PR #11934:
URL: https://github.com/apache/hudi/pull/11934#discussion_r1774113092
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
Review Comment:
```suggestion
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,non_existent_field")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
Review Comment:
```suggestion
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"non_existent_field")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
Review Comment:
Could the test be parameterized for readability?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"fake_field_name\" cannot be null or empty."))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,tip_history.fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'tip_history.fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"tip_history.fake_field_name")
Review Comment:
```suggestion
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"tip_history.non_existent_field")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"fake_field_name\" cannot be null or empty."))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,tip_history.fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'tip_history.fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"tip_history.fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"tip_history.fake_field_name\" cannot be null or
empty."))
Review Comment:
Same here for fixing exception.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
Review Comment:
```suggestion
fail("should fail when the specified record key field does not exist")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"fake_field_name\" cannot be null or empty."))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
Review Comment:
```suggestion
inputDF1.write.format("hudi")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
Review Comment:
```suggestion
fail("should fail when the specified record key field does not exist")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"fake_field_name\" cannot be null or empty."))
Review Comment:
```suggestion
case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"non_existent_field\" cannot be null or empty."))
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"fake_field_name\" cannot be null or empty."))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,tip_history.fake_field_name")
Review Comment:
```suggestion
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,tip_history.non_existent_field")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
Review Comment:
```suggestion
case e: Exception => assertTrue(containsErrorMessage(e, "Record key
field 'non_existent_field' does not exist in the input record"))
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java:
##########
@@ -153,7 +154,12 @@ public static String getRecordKey(GenericRecord record,
List<String> recordKeyFi
StringBuilder recordKey = new StringBuilder();
for (int i = 0; i < recordKeyFields.size(); i++) {
String recordKeyField = recordKeyFields.get(i);
- String recordKeyValue =
HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true,
consistentLogicalTimestampEnabled);
+ String recordKeyValue;
+ try {
+ recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record,
recordKeyField, false, consistentLogicalTimestampEnabled);
+ } catch (HoodieException e) {
+ throw new HoodieKeyException("Recordkey field '" + recordKeyField + "'
does not exist in the input record");
Review Comment:
```suggestion
throw new HoodieKeyException("Record key field '" + recordKeyField +
"' does not exist in the input record");
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
Review Comment:
```suggestion
inputDF1.write.format("hudi")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"fake_field_name\" cannot be null or empty."))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,tip_history.fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'tip_history.fake_field_name' does not exist in the input record"))
Review Comment:
```suggestion
case e: Exception => assertTrue(containsErrorMessage(e, "Record key
field 'tip_history. non_existent_field' does not exist in the input record"))
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"fake_field_name\" cannot be null or empty."))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,tip_history.fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'tip_history.fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
Review Comment:
```suggestion
inputDF1.write.format("hudi")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
Review Comment:
```suggestion
inputDF1.write.format("hudi")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"fake_field_name\" cannot be null or empty."))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,tip_history.fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
Review Comment:
```suggestion
fail("should fail when the specified record key field does not exist")
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -713,6 +714,69 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Test
+ def testMissingRecordkeyField(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "recordKey
value: \"null\" for field: \"fake_field_name\" cannot be null or empty."))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"_row_key,tip_history.fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, "Recordkey field
'tip_history.fake_field_name' does not exist in the input record"))
+ }
+
+ try {
+ inputDF1.write.format("org.apache.hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(),
"tip_history.fake_field_name")
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when fake field is provided for recordkey")
Review Comment:
Same here on error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]