xiangfu0 commented on code in PR #18325:
URL: https://github.com/apache/pinot/pull/18325#discussion_r3141567283


##########
pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorUtils.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+
+/**
+ * Shared helpers for normalizing Parquet list wrapper records while 
extracting Pinot rows.
+ *
+ * <p>This class is stateless and thread-safe.
+ */
+final class ParquetRecordExtractorUtils {
+  private static final String LIST_ELEMENT_FIELD_NAME = "element";
+  private static final String MAP_ENTRIES_FIELD_NAME = "key_value";
+  private static final String MAP_KEY_FIELD_NAME = "key";
+  private static final String MAP_VALUE_FIELD_NAME = "value";
+
+  private ParquetRecordExtractorUtils() {
+  }
+
+  static Object[] unwrapListElementMaps(Object[] values) {
+    if (values.length == 0) {
+      return values;
+    }
+
+    boolean hasElementMap = false;
+    for (Object value : values) {
+      if (value == null) {
+        continue;
+      }
+      if (!(value instanceof Map)) {
+        return values;
+      }
+      Map<?, ?> map = (Map<?, ?>) value;
+      if (map.size() != 1 || !map.containsKey(LIST_ELEMENT_FIELD_NAME)) {
+        return values;
+      }
+      hasElementMap = true;
+    }
+    if (!hasElementMap) {
+      return values;
+    }
+
+    Object[] unwrappedValues = new Object[values.length];
+    for (int i = 0; i < values.length; i++) {
+      Object value = values[i];
+      unwrappedValues[i] = value == null ? null : ((Map<?, ?>) 
value).get(LIST_ELEMENT_FIELD_NAME);
+    }
+    return unwrappedValues;
+  }
+
+  static Map<String, Object> unwrapMapKeyValues(Map<String, Object> map) {
+    if (map.size() != 1 || !map.containsKey(MAP_ENTRIES_FIELD_NAME)) {

Review Comment:
   No more `Map.containsKey` + `Map.get` patterns — the new code resolves 
Parquet field indices from the schema once and reads `key` / `value` directly.



##########
pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorUtils.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+
+/**
+ * Shared helpers for normalizing Parquet list wrapper records while 
extracting Pinot rows.
+ *
+ * <p>This class is stateless and thread-safe.
+ */
+final class ParquetRecordExtractorUtils {
+  private static final String LIST_ELEMENT_FIELD_NAME = "element";
+  private static final String MAP_ENTRIES_FIELD_NAME = "key_value";
+  private static final String MAP_KEY_FIELD_NAME = "key";
+  private static final String MAP_VALUE_FIELD_NAME = "value";
+
+  private ParquetRecordExtractorUtils() {
+  }
+
+  static Object[] unwrapListElementMaps(Object[] values) {
+    if (values.length == 0) {
+      return values;
+    }
+
+    boolean hasElementMap = false;
+    for (Object value : values) {
+      if (value == null) {
+        continue;
+      }
+      if (!(value instanceof Map)) {
+        return values;
+      }
+      Map<?, ?> map = (Map<?, ?>) value;
+      if (map.size() != 1 || !map.containsKey(LIST_ELEMENT_FIELD_NAME)) {
+        return values;
+      }
+      hasElementMap = true;
+    }
+    if (!hasElementMap) {
+      return values;
+    }
+
+    Object[] unwrappedValues = new Object[values.length];
+    for (int i = 0; i < values.length; i++) {
+      Object value = values[i];
+      unwrappedValues[i] = value == null ? null : ((Map<?, ?>) 
value).get(LIST_ELEMENT_FIELD_NAME);
+    }
+    return unwrappedValues;
+  }
+
+  static Map<String, Object> unwrapMapKeyValues(Map<String, Object> map) {
+    if (map.size() != 1 || !map.containsKey(MAP_ENTRIES_FIELD_NAME)) {
+      return map;
+    }
+
+    Object entries = map.get(MAP_ENTRIES_FIELD_NAME);
+    Object[] entryArray = toEntryArray(entries);
+    if (entryArray == null || entryArray.length == 0) {
+      return new LinkedHashMap<>();

Review Comment:
   Done — `extractKeyValueMap` returns `Map.of()` for the empty case.



##########
pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorUtils.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+
+/**
+ * Shared helpers for normalizing Parquet list wrapper records while 
extracting Pinot rows.
+ *
+ * <p>This class is stateless and thread-safe.
+ */
+final class ParquetRecordExtractorUtils {
+  private static final String LIST_ELEMENT_FIELD_NAME = "element";
+  private static final String MAP_ENTRIES_FIELD_NAME = "key_value";
+  private static final String MAP_KEY_FIELD_NAME = "key";
+  private static final String MAP_VALUE_FIELD_NAME = "value";
+
+  private ParquetRecordExtractorUtils() {
+  }
+
+  static Object[] unwrapListElementMaps(Object[] values) {
+    if (values.length == 0) {
+      return values;
+    }
+
+    boolean hasElementMap = false;
+    for (Object value : values) {
+      if (value == null) {
+        continue;
+      }
+      if (!(value instanceof Map)) {
+        return values;
+      }
+      Map<?, ?> map = (Map<?, ?>) value;
+      if (map.size() != 1 || !map.containsKey(LIST_ELEMENT_FIELD_NAME)) {
+        return values;
+      }
+      hasElementMap = true;
+    }
+    if (!hasElementMap) {
+      return values;
+    }
+
+    Object[] unwrappedValues = new Object[values.length];
+    for (int i = 0; i < values.length; i++) {
+      Object value = values[i];
+      unwrappedValues[i] = value == null ? null : ((Map<?, ?>) 
value).get(LIST_ELEMENT_FIELD_NAME);
+    }
+    return unwrappedValues;
+  }
+
+  static Map<String, Object> unwrapMapKeyValues(Map<String, Object> map) {
+    if (map.size() != 1 || !map.containsKey(MAP_ENTRIES_FIELD_NAME)) {
+      return map;
+    }
+
+    Object entries = map.get(MAP_ENTRIES_FIELD_NAME);
+    Object[] entryArray = toEntryArray(entries);
+    if (entryArray == null || entryArray.length == 0) {
+      return new LinkedHashMap<>();
+    }
+
+    Map<String, Object> unwrappedMap = new LinkedHashMap<>();

Review Comment:
   After looking into this, Parquet does not guarantee MAP read order — 
writers, page boundaries, and dictionary encodings can all reorder entries. 
Documented this on `extractKeyValueMap` and pointed users who need stable order 
to `LIST<STRUCT<key, value>>`. Returning a `HashMap`, consistent with the rest 
of the native extractor.



##########
pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetCollectionRecordReaderTest.java:
##########
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests Parquet collection wrapper handling in the native and Avro-backed 
readers.
+ *
+ * <p>This test class is stateful only through its temporary directory and is 
not thread-safe.
+ */
+public class ParquetCollectionRecordReaderTest {
+  private static final String SCHEMA = "message CollectionWrapperExample {"
+      + "optional group topLevelTags (LIST) {"

Review Comment:
   Test schemas are now built with `String.join("\n", ...)` instead of 
hand-concatenated strings — much easier to read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to