This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch codex/parquet-list-element-reader-fix
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 14a559abc0a39d6cf799dd7b7684c5635d385a2c
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Apr 24 13:01:56 2026 -0700

    Fix Parquet list element wrapper extraction
---
 .../parquet/ParquetAvroRecordExtractor.java        |   5 +
 .../parquet/ParquetNativeRecordExtractor.java      |   4 +-
 .../parquet/ParquetRecordExtractorUtils.java       |  65 ++++++++++
 .../ParquetListElementRecordReaderTest.java        | 142 +++++++++++++++++++++
 4 files changed, 214 insertions(+), 2 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
index 8d8f282eef1..8eb2915cada 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
@@ -37,6 +37,11 @@ public class ParquetAvroRecordExtractor extends 
AvroRecordExtractor {
     return handleDeprecatedTypes(convert(value), field);
   }
 
+  @Override
+  protected Object[] convertMultiValue(Object value) {
+    return 
ParquetRecordExtractorUtils.unwrapListElementMaps(super.convertMultiValue(value));
+  }
+
   Object handleDeprecatedTypes(Object value, Schema.Field field) {
     Schema.Type avroColumnType = field.schema().getType();
     if (avroColumnType == Schema.Type.UNION) {
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
index 4c8ac432bba..36de06d005c 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
@@ -182,9 +182,9 @@ public class ParquetNativeRecordExtractor extends 
BaseRecordExtractor<Group> {
       return new Object[0];
     }
     if (numValues == 1 && array[0] instanceof Object[]) {
-      return (Object[]) array[0];
+      return ParquetRecordExtractorUtils.unwrapListElementMaps((Object[]) 
array[0]);
     }
-    return array;
+    return ParquetRecordExtractorUtils.unwrapListElementMaps(array);
   }
 
   public Map<String, Object> extractMap(Group group) {
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorUtils.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorUtils.java
new file mode 100644
index 00000000000..ce7b787bfdd
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.util.Map;
+
+
+/**
+ * Shared helpers for normalizing Parquet list wrapper records while 
extracting Pinot rows.
+ *
+ * <p>This class is stateless and thread-safe.
+ */
+final class ParquetRecordExtractorUtils {
+  private static final String LIST_ELEMENT_FIELD_NAME = "element";
+
+  private ParquetRecordExtractorUtils() {
+  }
+
+  static Object[] unwrapListElementMaps(Object[] values) {
+    if (values.length == 0) {
+      return values;
+    }
+
+    boolean hasElementMap = false;
+    for (Object value : values) {
+      if (value == null) {
+        continue;
+      }
+      if (!(value instanceof Map)) {
+        return values;
+      }
+      Map<?, ?> map = (Map<?, ?>) value;
+      if (map.size() != 1 || !map.containsKey(LIST_ELEMENT_FIELD_NAME)) {
+        return values;
+      }
+      hasElementMap = true;
+    }
+    if (!hasElementMap) {
+      return values;
+    }
+
+    Object[] unwrappedValues = new Object[values.length];
+    for (int i = 0; i < values.length; i++) {
+      Object value = values[i];
+      unwrappedValues[i] = value == null ? null : ((Map<?, ?>) 
value).get(LIST_ELEMENT_FIELD_NAME);
+    }
+    return unwrappedValues;
+  }
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetListElementRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetListElementRecordReaderTest.java
new file mode 100644
index 00000000000..e8ce624b3e8
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetListElementRecordReaderTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests Parquet list element wrapper handling in the native and Avro-backed 
readers.
+ *
+ * <p>This test class is stateful only through its temporary directory and is 
not thread-safe.
+ */
+public class ParquetListElementRecordReaderTest {
+  private static final String SCHEMA = "message ListElementExample {"
+      + "optional group metadata {"
+      + "optional binary element (STRING);"
+      + "optional group tags (LIST) {"
+      + "repeated group list {"
+      + "optional binary element (STRING);"
+      + "}"
+      + "}"
+      + "optional group singleTag (LIST) {"
+      + "repeated group list {"
+      + "optional binary element (STRING);"
+      + "}"
+      + "}"
+      + "optional group tagStructs (LIST) {"
+      + "repeated group list {"
+      + "optional group element {"
+      + "optional binary element (STRING);"
+      + "}"
+      + "}"
+      + "}"
+      + "}"
+      + "}";
+
+  private final File _tempDir = new File(FileUtils.getTempDirectory(), 
getClass().getSimpleName());
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Test
+  public void testNativeRecordReaderUnwrapsListElementMaps()
+      throws IOException {
+    File dataFile = writeParquetFile("native-list-element.parquet");
+    try (ParquetNativeRecordReader recordReader = new 
ParquetNativeRecordReader()) {
+      recordReader.init(dataFile, null, null);
+      assertListValues(recordReader);
+    }
+  }
+
+  @Test
+  public void testAvroRecordReaderUnwrapsListElementMaps()
+      throws IOException {
+    File dataFile = writeParquetFile("avro-list-element.parquet");
+    try (ParquetAvroRecordReader recordReader = new ParquetAvroRecordReader()) 
{
+      recordReader.init(dataFile, null, null);
+      assertListValues(recordReader);
+    }
+  }
+
+  private File writeParquetFile(String fileName)
+      throws IOException {
+    FileUtils.forceMkdir(_tempDir);
+    File dataFile = new File(_tempDir, fileName);
+    FileUtils.deleteQuietly(dataFile);
+    MessageType schema = MessageTypeParser.parseMessageType(SCHEMA);
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(new 
Path(dataFile.getAbsolutePath()))
+        .withType(schema)
+        .withCompressionCodec(CompressionCodecName.SNAPPY)
+        .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
+        .build()) {
+      SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
+      Group group = groupFactory.newGroup();
+      Group metadata = group.addGroup("metadata");
+      metadata.append("element", "real-element-field");
+      metadata.addGroup("tags").addGroup("list").append("element", "abc");
+      metadata.getGroup("tags", 0).addGroup("list").append("element", "xyz");
+      metadata.addGroup("singleTag").addGroup("list").append("element", "one");
+      
metadata.addGroup("tagStructs").addGroup("list").addGroup("element").append("element",
 "inner-a");
+      metadata.getGroup("tagStructs", 
0).addGroup("list").addGroup("element").append("element", "inner-b");
+      writer.write(group);
+    }
+    return dataFile;
+  }
+
+  @SuppressWarnings("unchecked")
+  private void assertListValues(RecordReader recordReader)
+      throws IOException {
+    assertTrue(recordReader.hasNext());
+    GenericRow row = recordReader.next();
+    Map<String, Object> metadata = (Map<String, Object>) 
row.getValue("metadata");
+    assertEquals(metadata.get("element"), "real-element-field");
+    assertEquals(Arrays.asList((Object[]) metadata.get("tags")), 
Arrays.asList("abc", "xyz"));
+    assertEquals(Arrays.asList((Object[]) metadata.get("singleTag")), 
Arrays.asList("one"));
+
+    Object[] tagStructs = (Object[]) metadata.get("tagStructs");
+    assertEquals(tagStructs.length, 2);
+    assertEquals(tagStructs[0], Map.of("element", "inner-a"));
+    assertEquals(tagStructs[1], Map.of("element", "inner-b"));
+    assertFalse(recordReader.hasNext());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to