Copilot commented on code in PR #18378:
URL: https://github.com/apache/pinot/pull/18378#discussion_r3170320516


##########
pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetCollectionAndEquivalenceTest.java:
##########
@@ -496,97 +482,45 @@ private static List<GenericRow> readAll(RecordReader 
reader, File file)
     return rows;
   }
 
-  /**
-   * Normalizes a row's value map so that representation differences between 
the readers don't show up as data
-   * differences in the cross-reader equivalence check: {@code Object[]} 
becomes a {@code List}, nested Maps
-   * recurse into a {@link TreeMap} (so HashMap vs LinkedHashMap iteration 
order doesn't matter), {@code byte[]}
-   * gets content-based equality, and {@code Date}/{@code Timestamp}/{@code 
LocalDate}/{@code Instant} (as well
-   * as the {@code "YYYY-MM-DD"} string the Avro reader emits for LocalDate) 
all collapse to the underlying
-   * raw numeric form the native reader returns.
-   */
-  private static Map<String, Object> canonicalize(Map<String, Object> row) {
-    Map<String, Object> out = new TreeMap<>();
+  /// Wraps `byte[]` as [ByteArray] and converts `Object[]` to `List`, 
recursing through nested [Map] / [List] values,
+  /// so the row map can be compared via [Map#equals] without tripping on JVM 
array reference-equality.
+  private static Map<String, Object> wrapArray(Map<String, Object> row) {
+    Map<String, Object> out = Maps.newHashMapWithExpectedSize(row.size());
     for (Map.Entry<String, Object> e : row.entrySet()) {
-      out.put(e.getKey(), canonicalizeValue(e.getValue()));
+      out.put(e.getKey(), wrapArrayValue(e.getValue()));
     }
     return out;
   }
 
   @SuppressWarnings("unchecked")
-  private static Object canonicalizeValue(Object value) {
+  private static Object wrapArrayValue(Object value) {
+    if (value instanceof byte[]) {
+      return new ByteArray((byte[]) value);
+    }
     if (value instanceof Object[]) {
-      Object[] arr = (Object[]) value;
-      List<Object> list = new ArrayList<>(arr.length);
-      for (Object o : arr) {
-        list.add(canonicalizeValue(o));
+      Object[] array = (Object[]) value;
+      List<Object> list = new ArrayList<>(array.length);
+      for (Object v : array) {
+        list.add(wrapArrayValue(v));
       }
       return list;
     }
     if (value instanceof List) {
       List<Object> in = (List<Object>) value;
-      List<Object> list = new ArrayList<>(in.size());
-      for (Object o : in) {
-        list.add(canonicalizeValue(o));
+      List<Object> out = new ArrayList<>(in.size());
+      for (Object v : in) {
+        out.add(wrapArrayValue(v));
       }
-      return list;
+      return out;
     }
     if (value instanceof Map) {
-      Map<String, Object> in = (Map<String, Object>) value;
-      Map<String, Object> out = new TreeMap<>();
-      for (Map.Entry<String, Object> e : in.entrySet()) {
-        out.put(e.getKey(), canonicalizeValue(e.getValue()));
+      Map<Object, Object> in = (Map<Object, Object>) value;
+      Map<Object, Object> out = Maps.newHashMapWithExpectedSize(in.size());
+      for (Map.Entry<Object, Object> e : in.entrySet()) {
+        out.put(e.getKey(), wrapArrayValue(e.getValue()));
       }
       return out;

Review Comment:
   In wrapArrayValue(Map) you only wrap/normalize the *values* recursively, but 
you leave map *keys* unchanged. If any extractor returns a nested map with a 
`byte[]` key (e.g., binary map keys in Parquet/ORC), `Map.equals()` will still 
use reference-equality for those keys and this equivalence test will fail even 
when the two readers are logically identical. Consider wrapping/normalizing 
keys too (at least `byte[]` → ByteArray, and recursively handling 
Object[]/Map/List keys if they can appear).



##########
pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractor.java:
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.orc;
+
+import com.google.common.collect.Maps;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/// Extracts a single ORC row into a [GenericRow]. Input is an 
[ORCRecordExtractor.Record] handle wrapping
+/// a [VectorizedRowBatch] + schema + row index. Dispatch happens in 
[#extractValue] (complex types —
+/// `LIST` / `MAP` / `STRUCT`) and [#extractSingleValue] (primitives); ORC 
values never flow through
+/// `convertSingleValue`, so widening / `Temporal` handling is done locally 
here.
+///
+/// **ORC schema category → Java output type:**
+/// - `BOOLEAN` → `Boolean`
+/// - `BYTE` / `SHORT` / `INT` → `Integer` (small ints widen to `Integer`)
+/// - `LONG` → `Long`
+/// - `FLOAT` → `Float`
+/// - `DOUBLE` → `Double`
+/// - `DECIMAL` → `BigDecimal`
+/// - `STRING` / `VARCHAR` / `CHAR` → `String`
+/// - `BINARY` → `byte[]`
+/// - `DATE` → [LocalDate] via [LocalDate#ofEpochDay] (TZ-independent, 
calendar-date semantics)
+/// - `TIMESTAMP` / `TIMESTAMP_INSTANT` → [Timestamp] preserving full 
sub-second nanos from [TimestampColumnVector]
+/// - `LIST<X>` → `Object[]` (null elements preserved; empty list surfaces as 
empty `Object[]`)
+/// - `MAP<K, V>` → `Map<Object, Object>`
+/// - `STRUCT<...>` → `Map<String, Object>`
+/// - any nullable column with `isNull[rowId]` set → `null`
+public class ORCRecordExtractor extends 
BaseRecordExtractor<ORCRecordExtractor.Record> {
+
+  /// One ORC row's worth of state. The reader allocates a single instance and 
calls [#set] on each row
+  /// advance — no per-row allocation.
+  public static final class Record {
+    VectorizedRowBatch _batch;
+    TypeDescription _schema;
+    int _rowId;
+
+    public void set(VectorizedRowBatch batch, TypeDescription schema, int 
rowId) {
+      _batch = batch;
+      _schema = schema;
+      _rowId = rowId;
+    }
+  }
+
+  // Cached `_fields ∩ source-schema` → child-index map for the include-list 
path. The ORC source schema is
+  // fixed for the lifetime of the reader (and therefore the extractor 
instance), so we build this lazily on
+  // the first row and reuse it from then on — no per-row schema-identity 
check needed. Iterating this map
+  // directly drives the extraction loop, so no per-row HashMap lookup against 
the source schema either.
+  private Map<String, Integer> _fieldToIndexMap;
+
+  @Override
+  public GenericRow extract(Record from, GenericRow to) {
+    TypeDescription schema = from._schema;
+    List<TypeDescription> fieldTypes = schema.getChildren();
+    ColumnVector[] cols = from._batch.cols;
+    int rowId = from._rowId;
+    if (_extractAll) {
+      List<String> fieldNames = schema.getFieldNames();
+      int numFields = fieldNames.size();
+      for (int i = 0; i < numFields; i++) {
+        String fieldName = fieldNames.get(i);
+        to.putValue(fieldName, extractValue(fieldName, cols[i], 
fieldTypes.get(i), rowId));
+      }
+    } else {
+      if (_fieldToIndexMap == null) {
+        _fieldToIndexMap = buildFieldToIndexMap(schema, _fields);
+      }
+      for (Map.Entry<String, Integer> entry : _fieldToIndexMap.entrySet()) {
+        int index = entry.getValue();
+        to.putValue(entry.getKey(), extractValue(entry.getKey(), cols[index], 
fieldTypes.get(index), rowId));
+      }
+    }
+    return to;
+  }
+
+  private static Map<String, Integer> buildFieldToIndexMap(TypeDescription 
schema, Set<String> fields) {
+    Map<String, Integer> map = Maps.newHashMapWithExpectedSize(fields.size());
+    List<String> fieldNames = schema.getFieldNames();
+    int numFieldsInSchema = fieldNames.size();
+    for (int i = 0; i < numFieldsInSchema; i++) {
+      String name = fieldNames.get(i);
+      if (fields.contains(name)) {
+        map.put(name, i);
+      }
+    }
+    return map;
+  }
+
+  /// Extracts the value at `rowId` from `columnVector`, dispatching by 
`fieldType.getCategory()`. Recurses
+  /// into [#extractValue] for nested complex types and falls through to 
[#extractSingleValue] for primitives.
+  @Nullable
+  private static Object extractValue(String field, ColumnVector columnVector, 
TypeDescription fieldType, int rowId) {
+    if (columnVector.isRepeating) {
+      rowId = 0;
+    }
+    if (!columnVector.noNulls && columnVector.isNull[rowId]) {
+      return null;
+    }
+    TypeDescription.Category category = fieldType.getCategory();
+    switch (category) {
+      case LIST: {
+        TypeDescription childType = fieldType.getChildren().get(0);
+        ListColumnVector listColumnVector = (ListColumnVector) columnVector;
+        int offset = (int) listColumnVector.offsets[rowId];
+        int length = (int) listColumnVector.lengths[rowId];
+        Object[] values = new Object[length];
+        for (int j = 0; j < length; j++) {
+          values[j] = extractValue(field, listColumnVector.child, childType, 
offset + j);
+        }
+        return values;
+      }
+      case MAP: {
+        // Map keys go straight to `extractSingleValue` instead of 
`extractValue` — we deliberately skip the
+        // `isRepeating` / null guards that `extractValue` performs. ORC's 
format invariants guarantee map
+        // keys are non-null, and the Apache ORC reader populates each entry 
individually instead of
+        // collapsing the child key vector to `isRepeating = true`. Adding the 
guards here would be defensive
+        // against API states no real reader produces.
+        List<TypeDescription> children = fieldType.getChildren();
+        TypeDescription.Category keyCategory = children.get(0).getCategory();
+        TypeDescription valueType = children.get(1);
+        MapColumnVector mapColumnVector = (MapColumnVector) columnVector;
+        int offset = (int) mapColumnVector.offsets[rowId];
+        int length = (int) mapColumnVector.lengths[rowId];
+        Map<Object, Object> map = new HashMap<>();
+        for (int j = 0; j < length; j++) {
+          int childRowId = offset + j;
+          Object key = extractSingleValue(field, mapColumnVector.keys, 
childRowId, keyCategory);
+          Object value = extractValue(field, mapColumnVector.values, 
valueType, childRowId);
+          map.put(key, value);

Review Comment:
   ORC MAP key extraction calls extractSingleValue() directly on the child key 
vector, but extractSingleValue() does not honor ColumnVector.isRepeating (and 
skips the null guard that extractValue() applies). If ORC marks the key vector 
as repeating for a batch, this will read the wrong index and can lead to 
incorrect keys or out-of-bounds access. Safer approach: apply the repeating 
adjustment inside extractSingleValue (or route keys through extractValue and 
then enforce non-null), so MAP keys follow the same vector semantics as other 
primitives.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to