This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d888ebf031 [HUDI-7994] Support secondary index on nested fields 
(#12579)
5d888ebf031 is described below

commit 5d888ebf0318da70a723e458109304326de6b5b4
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Jan 7 21:09:33 2025 +0530

    [HUDI-7994] Support secondary index on nested fields (#12579)
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 17 +++++--
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  | 49 +++++++++++++++++++
 .../functional/TestSecondaryIndexPruning.scala     | 57 ++++++++++++++++++++++
 3 files changed, 118 insertions(+), 5 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 7e67e41581e..01b55898843 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -202,6 +202,7 @@ public class HoodieAvroUtils {
   /**
    * Convert a given avro record to a JSON string. If the record contents are 
invalid, return the record.toString().
    * Use this method over {@link HoodieAvroUtils#avroToJsonString} when simply 
trying to print the record contents without any guarantees around their 
correctness.
+   *
    * @param record The GenericRecord to convert
    * @return a JSON string
    */
@@ -676,13 +677,19 @@ public class HoodieAvroUtils {
    */
   public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema, 
String fieldName) {
     String[] parts = fieldName.split("\\.");
-    int i = 0;
-    for (; i < parts.length; i++) {
+    Schema currentSchema = writeSchema;
+    for (int i = 0; i < parts.length; i++) {
       String part = parts[i];
-      Schema schema = writeSchema.getField(part).schema();
+      try {
+        // Resolve nullable/union schema to the actual schema
+        currentSchema = 
resolveNullableSchema(currentSchema.getField(part).schema());
 
-      if (i == parts.length - 1) {
-        return resolveNullableSchema(schema);
+        if (i == parts.length - 1) {
+          // Return the schema for the final part
+          return resolveNullableSchema(currentSchema);
+        }
+      } catch (Exception e) {
+        throw new HoodieException("Failed to get schema. Not a valid field 
name: " + fieldName);
       }
     }
     throw new HoodieException("Failed to get schema. Not a valid field name: " 
+ fieldName);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 4089de3154b..635e124ab3d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -173,6 +173,28 @@ public class TestHoodieAvroUtils {
       + 
"{\"name\":\"localTimestampMillisField\",\"type\":\"long\",\"logicalType\":\"local-timestamp-millis\"},"
       + 
"{\"name\":\"localTimestampMicrosField\",\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}"
       + "]}";
+  // Define schema with a nested field containing a union type
+  private static final String NESTED_SCHEMA_WITH_UNION = "{\n"
+      + "  \"type\": \"record\",\n"
+      + "  \"name\": \"NestedRecordWithUnion\",\n"
+      + "  \"fields\": [\n"
+      + "    {\n"
+      + "      \"name\": \"student\",\n"
+      + "      \"type\": [\n"
+      + "        \"null\",\n"
+      + "        {\n"
+      + "          \"type\": \"record\",\n"
+      + "          \"name\": \"Student\",\n"
+      + "          \"fields\": [\n"
+      + "            {\"name\": \"firstname\", \"type\": [\"null\", 
\"string\"], \"default\": null},\n"
+      + "            {\"name\": \"lastname\", \"type\": [\"null\", 
\"string\"], \"default\": null}\n"
+      + "          ]\n"
+      + "        }\n"
+      + "      ],\n"
+      + "      \"default\": null\n"
+      + "    }\n"
+      + "  ]\n"
+      + "}";
 
   @Test
   public void testPropsPresent() {
@@ -463,6 +485,33 @@ public class TestHoodieAvroUtils {
     assertEquals(Schema.create(Schema.Type.STRING), 
getNestedFieldSchemaFromWriteSchema(nestedSchema, "student.firstname"));
   }
 
+  @Test
+  public void testGetNestedFieldSchemaWithUnion() {
+    Schema schema = new Schema.Parser().parse(NESTED_SCHEMA_WITH_UNION);
+    // Create a record for the schema
+    GenericRecord rec = new GenericData.Record(schema);
+    Schema studentSchema = 
schema.getField("student").schema().getTypes().get(1); // Resolve union schema 
for "student"
+    GenericRecord studentRecord = new GenericData.Record(studentSchema);
+    studentRecord.put("firstname", "John");
+    studentRecord.put("lastname", "Doe");
+    rec.put("student", studentRecord);
+
+    // Test nested field schema for "student.firstname"
+    Schema expectedFirstnameSchema = Schema.create(Schema.Type.STRING);
+    assertEquals(expectedFirstnameSchema, 
getNestedFieldSchemaFromWriteSchema(schema, "student.firstname"));
+
+    // Test nested field schema for "student.lastname"
+    Schema expectedLastnameSchema = Schema.create(Schema.Type.STRING);
+    assertEquals(expectedLastnameSchema, 
getNestedFieldSchemaFromWriteSchema(schema, "student.lastname"));
+
+    // Test nullable handling for "student" (entire field)
+    assertEquals(studentSchema, getNestedFieldSchemaFromWriteSchema(schema, 
"student"));
+
+    // Test exception for invalid nested field
+    Exception exception = assertThrows(HoodieException.class, () -> 
getNestedFieldSchemaFromWriteSchema(schema, "student.middleName"));
+    assertTrue(exception.getMessage().contains("Failed to get schema. Not a 
valid field name"));
+  }
+
   @Test
   public void testReWriteAvroRecordWithNewSchema() {
     Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 5f633034e97..6061b4823d8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -1424,6 +1424,63 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @Test
+  def testSecondaryIndexWithNestedFields(): Unit = {
+    var hudiOpts = commonOpts
+    hudiOpts = hudiOpts ++ Map(
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+    tableName += "test_secondary_index_with_nested_fields"
+
+    // Create table with nested fields
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  record_key_col string,
+         |  name struct<first_name:string, last_name:string>,
+         |  ts bigint
+         |) using hudi
+         | options (
+         |  primaryKey ='record_key_col',
+         |  hoodie.metadata.enable = 'true',
+         |  hoodie.metadata.record.index.enable = 'true',
+         |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+         |  hoodie.enable.data.skipping = 'true',
+         |  hoodie.datasource.write.payload.class = 
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
+         | )
+         | location '$basePath'
+      """.stripMargin)
+    // insert initial records
+    spark.sql(s"insert into $tableName values('id1', 
named_struct('first_name', 'John', 'last_name', 'Doe'), 1)")
+    spark.sql(s"insert into $tableName values('id2', 
named_struct('first_name', 'Jane', 'last_name', 'Smith'), 2)")
+    // create secondary index on name.last_name field
+    spark.sql(s"create index idx_last_name on $tableName (name.last_name)")
+    // validate index creation
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .build()
+    
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_last_name"))
+    // validate index records
+    checkAnswer(s"select key from hudi_metadata('$basePath') where type=7")(
+      Seq(s"Doe${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id1"),
+      Seq(s"Smith${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id2")
+    )
+    // verify pruning
+    checkAnswer(s"select record_key_col, name.last_name, ts from $tableName 
where name.last_name = 'Doe'")(
+      Seq("id1", "Doe", 1)
+    )
+    // update nested field
+    spark.sql(s"update $tableName set name = named_struct('first_name', 
'John', 'last_name', 'Brown') where record_key_col = 'id1'")
+    // validate updated index records
+    checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+      Seq(s"Brown${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id1", false),
+      Seq(s"Smith${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id2", false)
+    )
+    // verify pruning
+    checkAnswer(s"select record_key_col, name.last_name, ts from $tableName 
where name.last_name = 'Brown'")(
+      Seq("id1", "Brown", 1)
+    )
+  }
 
   private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
     assertResult(expects.map(row => Row(row: 
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))

Reply via email to