This is an automated email from the ASF dual-hosted git repository. xiangfu0 pushed a commit to branch codex/parquet-list-element-reader-fix in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 14a559abc0a39d6cf799dd7b7684c5635d385a2c Author: Xiang Fu <[email protected]> AuthorDate: Fri Apr 24 13:01:56 2026 -0700 Fix Parquet list element wrapper extraction --- .../parquet/ParquetAvroRecordExtractor.java | 5 + .../parquet/ParquetNativeRecordExtractor.java | 4 +- .../parquet/ParquetRecordExtractorUtils.java | 65 ++++++++++ .../ParquetListElementRecordReaderTest.java | 142 +++++++++++++++++++++ 4 files changed, 214 insertions(+), 2 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java index 8d8f282eef1..8eb2915cada 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java @@ -37,6 +37,11 @@ public class ParquetAvroRecordExtractor extends AvroRecordExtractor { return handleDeprecatedTypes(convert(value), field); } + @Override + protected Object[] convertMultiValue(Object value) { + return ParquetRecordExtractorUtils.unwrapListElementMaps(super.convertMultiValue(value)); + } + Object handleDeprecatedTypes(Object value, Schema.Field field) { Schema.Type avroColumnType = field.schema().getType(); if (avroColumnType == Schema.Type.UNION) { diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java index 4c8ac432bba..36de06d005c 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java @@ -182,9 +182,9 @@ public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> { return new Object[0]; } if (numValues == 1 && array[0] instanceof Object[]) { - return (Object[]) array[0]; + return ParquetRecordExtractorUtils.unwrapListElementMaps((Object[]) array[0]); } - return array; + return ParquetRecordExtractorUtils.unwrapListElementMaps(array); } public Map<String, Object> extractMap(Group group) { diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorUtils.java new file mode 100644 index 00000000000..ce7b787bfdd --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorUtils.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.parquet; + +import java.util.Map; + + +/** + * Shared helpers for normalizing Parquet list wrapper records while extracting Pinot rows. + * + * <p>This class is stateless and thread-safe. + */ +final class ParquetRecordExtractorUtils { + private static final String LIST_ELEMENT_FIELD_NAME = "element"; + + private ParquetRecordExtractorUtils() { + } + + static Object[] unwrapListElementMaps(Object[] values) { + if (values.length == 0) { + return values; + } + + boolean hasElementMap = false; + for (Object value : values) { + if (value == null) { + continue; + } + if (!(value instanceof Map)) { + return values; + } + Map<?, ?> map = (Map<?, ?>) value; + if (map.size() != 1 || !map.containsKey(LIST_ELEMENT_FIELD_NAME)) { + return values; + } + hasElementMap = true; + } + if (!hasElementMap) { + return values; + } + + Object[] unwrappedValues = new Object[values.length]; + for (int i = 0; i < values.length; i++) { + Object value = values[i]; + unwrappedValues[i] = value == null ? null : ((Map<?, ?>) value).get(LIST_ELEMENT_FIELD_NAME); + } + return unwrappedValues; + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetListElementRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetListElementRecordReaderTest.java new file mode 100644 index 00000000000..e8ce624b3e8 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetListElementRecordReaderTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.parquet; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +/** + * Tests Parquet list element wrapper handling in the native and Avro-backed readers. + * + * <p>This test class is stateful only through its temporary directory and is not thread-safe. + */ +public class ParquetListElementRecordReaderTest { + private static final String SCHEMA = "message ListElementExample {" + + "optional group metadata {" + + "optional binary element (STRING);" + + "optional group tags (LIST) {" + + "repeated group list {" + + "optional binary element (STRING);" + + "}" + + "}" + + "optional group singleTag (LIST) {" + + "repeated group list {" + + "optional binary element (STRING);" + + "}" + + "}" + + "optional group tagStructs (LIST) {" + + "repeated group list {" + + "optional group element {" + + "optional binary element (STRING);" + + "}" + + "}" + + "}" + + "}" + + "}"; + + private final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()); + + @AfterClass + public void tearDown() + throws IOException { + FileUtils.deleteDirectory(_tempDir); + } + + @Test + public void testNativeRecordReaderUnwrapsListElementMaps() + throws IOException { + File dataFile = writeParquetFile("native-list-element.parquet"); + try (ParquetNativeRecordReader recordReader = new ParquetNativeRecordReader()) { + recordReader.init(dataFile, null, null); + assertListValues(recordReader); + } + } + + @Test + public void testAvroRecordReaderUnwrapsListElementMaps() + throws IOException { + File dataFile = writeParquetFile("avro-list-element.parquet"); + try (ParquetAvroRecordReader recordReader = new ParquetAvroRecordReader()) { + recordReader.init(dataFile, null, null); + assertListValues(recordReader); + } + } + + private File writeParquetFile(String fileName) + throws IOException { + FileUtils.forceMkdir(_tempDir); + File dataFile = new File(_tempDir, fileName); + FileUtils.deleteQuietly(dataFile); + MessageType schema = MessageTypeParser.parseMessageType(SCHEMA); + try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(new Path(dataFile.getAbsolutePath())) + .withType(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE) + .build()) { + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + Group group = groupFactory.newGroup(); + Group metadata = group.addGroup("metadata"); + metadata.append("element", "real-element-field"); + metadata.addGroup("tags").addGroup("list").append("element", "abc"); + metadata.getGroup("tags", 0).addGroup("list").append("element", "xyz"); + metadata.addGroup("singleTag").addGroup("list").append("element", "one"); + metadata.addGroup("tagStructs").addGroup("list").addGroup("element").append("element", "inner-a"); + metadata.getGroup("tagStructs", 0).addGroup("list").addGroup("element").append("element", "inner-b"); + writer.write(group); + } + return dataFile; + } + + @SuppressWarnings("unchecked") + private void assertListValues(RecordReader recordReader) + throws IOException { + assertTrue(recordReader.hasNext()); + GenericRow row = recordReader.next(); + Map<String, Object> metadata = (Map<String, Object>) row.getValue("metadata"); + assertEquals(metadata.get("element"), "real-element-field"); + assertEquals(Arrays.asList((Object[]) metadata.get("tags")), Arrays.asList("abc", "xyz")); + assertEquals(Arrays.asList((Object[]) metadata.get("singleTag")), Arrays.asList("one")); + + Object[] tagStructs = (Object[]) metadata.get("tagStructs"); + assertEquals(tagStructs.length, 2); + assertEquals(tagStructs[0], Map.of("element", "inner-a")); + assertEquals(tagStructs[1], Map.of("element", "inner-b")); + assertFalse(recordReader.hasNext()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
