This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5d888ebf031 [HUDI-7994] Support secondary index on nested fields
(#12579)
5d888ebf031 is described below
commit 5d888ebf0318da70a723e458109304326de6b5b4
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Jan 7 21:09:33 2025 +0530
[HUDI-7994] Support secondary index on nested fields (#12579)
---
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 17 +++++--
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 49 +++++++++++++++++++
.../functional/TestSecondaryIndexPruning.scala | 57 ++++++++++++++++++++++
3 files changed, 118 insertions(+), 5 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 7e67e41581e..01b55898843 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -202,6 +202,7 @@ public class HoodieAvroUtils {
/**
* Convert a given avro record to a JSON string. If the record contents are
invalid, return the record.toString().
* Use this method over {@link HoodieAvroUtils#avroToJsonString} when simply
trying to print the record contents without any guarantees around their
correctness.
+ *
* @param record The GenericRecord to convert
* @return a JSON string
*/
@@ -676,13 +677,19 @@ public class HoodieAvroUtils {
*/
public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema,
String fieldName) {
String[] parts = fieldName.split("\\.");
- int i = 0;
- for (; i < parts.length; i++) {
+ Schema currentSchema = writeSchema;
+ for (int i = 0; i < parts.length; i++) {
String part = parts[i];
- Schema schema = writeSchema.getField(part).schema();
+ try {
+ // Resolve nullable/union schema to the actual schema
+ currentSchema =
resolveNullableSchema(currentSchema.getField(part).schema());
- if (i == parts.length - 1) {
- return resolveNullableSchema(schema);
+ if (i == parts.length - 1) {
+ // Return the schema for the final part
+ return resolveNullableSchema(currentSchema);
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get schema. Not a valid field
name: " + fieldName);
}
}
throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 4089de3154b..635e124ab3d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -173,6 +173,28 @@ public class TestHoodieAvroUtils {
+
"{\"name\":\"localTimestampMillisField\",\"type\":\"long\",\"logicalType\":\"local-timestamp-millis\"},"
+
"{\"name\":\"localTimestampMicrosField\",\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}"
+ "]}";
+ // Define schema with a nested field containing a union type
+ private static final String NESTED_SCHEMA_WITH_UNION = "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"NestedRecordWithUnion\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"student\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Student\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"firstname\", \"type\": [\"null\",
\"string\"], \"default\": null},\n"
+ + " {\"name\": \"lastname\", \"type\": [\"null\",
\"string\"], \"default\": null}\n"
+ + " ]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"default\": null\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
@Test
public void testPropsPresent() {
@@ -463,6 +485,33 @@ public class TestHoodieAvroUtils {
assertEquals(Schema.create(Schema.Type.STRING),
getNestedFieldSchemaFromWriteSchema(nestedSchema, "student.firstname"));
}
+ @Test
+ public void testGetNestedFieldSchemaWithUnion() {
+ Schema schema = new Schema.Parser().parse(NESTED_SCHEMA_WITH_UNION);
+ // Create a record for the schema
+ GenericRecord rec = new GenericData.Record(schema);
+ Schema studentSchema =
schema.getField("student").schema().getTypes().get(1); // Resolve union schema
for "student"
+ GenericRecord studentRecord = new GenericData.Record(studentSchema);
+ studentRecord.put("firstname", "John");
+ studentRecord.put("lastname", "Doe");
+ rec.put("student", studentRecord);
+
+ // Test nested field schema for "student.firstname"
+ Schema expectedFirstnameSchema = Schema.create(Schema.Type.STRING);
+ assertEquals(expectedFirstnameSchema,
getNestedFieldSchemaFromWriteSchema(schema, "student.firstname"));
+
+ // Test nested field schema for "student.lastname"
+ Schema expectedLastnameSchema = Schema.create(Schema.Type.STRING);
+ assertEquals(expectedLastnameSchema,
getNestedFieldSchemaFromWriteSchema(schema, "student.lastname"));
+
+ // Test nullable handling for "student" (entire field)
+ assertEquals(studentSchema, getNestedFieldSchemaFromWriteSchema(schema,
"student"));
+
+ // Test exception for invalid nested field
+ Exception exception = assertThrows(HoodieException.class, () ->
getNestedFieldSchemaFromWriteSchema(schema, "student.middleName"));
+ assertTrue(exception.getMessage().contains("Failed to get schema. Not a
valid field name"));
+ }
+
@Test
public void testReWriteAvroRecordWithNewSchema() {
Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 5f633034e97..6061b4823d8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -1424,6 +1424,63 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
}
+ @Test
+ def testSecondaryIndexWithNestedFields(): Unit = {
+ var hudiOpts = commonOpts
+ hudiOpts = hudiOpts ++ Map(
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+ tableName += "test_secondary_index_with_nested_fields"
+
+ // Create table with nested fields
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | record_key_col string,
+ | name struct<first_name:string, last_name:string>,
+ | ts bigint
+ |) using hudi
+ | options (
+ | primaryKey ='record_key_col',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'record_key_col',
+ | hoodie.enable.data.skipping = 'true',
+ | hoodie.datasource.write.payload.class =
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
+ | )
+ | location '$basePath'
+ """.stripMargin)
+ // insert initial records
+ spark.sql(s"insert into $tableName values('id1',
named_struct('first_name', 'John', 'last_name', 'Doe'), 1)")
+ spark.sql(s"insert into $tableName values('id2',
named_struct('first_name', 'Jane', 'last_name', 'Smith'), 2)")
+ // create secondary index on name.last_name field
+ spark.sql(s"create index idx_last_name on $tableName (name.last_name)")
+ // validate index creation
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_last_name"))
+ // validate index records
+ checkAnswer(s"select key from hudi_metadata('$basePath') where type=7")(
+ Seq(s"Doe${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id1"),
+ Seq(s"Smith${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id2")
+ )
+ // verify pruning
+ checkAnswer(s"select record_key_col, name.last_name, ts from $tableName
where name.last_name = 'Doe'")(
+ Seq("id1", "Doe", 1)
+ )
+ // update nested field
+ spark.sql(s"update $tableName set name = named_struct('first_name',
'John', 'last_name', 'Brown') where record_key_col = 'id1'")
+ // validate updated index records
+ checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from
hudi_metadata('$basePath') where type=7")(
+ Seq(s"Brown${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id1", false),
+ Seq(s"Smith${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id2", false)
+ )
+ // verify pruning
+ checkAnswer(s"select record_key_col, name.last_name, ts from $tableName
where name.last_name = 'Brown'")(
+ Seq("id1", "Brown", 1)
+ )
+ }
private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
assertResult(expects.map(row => Row(row:
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))