Copilot commented on code in PR #18378:
URL: https://github.com/apache/pinot/pull/18378#discussion_r3170320516
##########
pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetCollectionAndEquivalenceTest.java:
##########
@@ -496,97 +482,45 @@ private static List<GenericRow> readAll(RecordReader
reader, File file)
return rows;
}
- /**
- * Normalizes a row's value map so that representation differences between
the readers don't show up as data
- * differences in the cross-reader equivalence check: {@code Object[]}
becomes a {@code List}, nested Maps
- * recurse into a {@link TreeMap} (so HashMap vs LinkedHashMap iteration
order doesn't matter), {@code byte[]}
- * gets content-based equality, and {@code Date}/{@code Timestamp}/{@code
LocalDate}/{@code Instant} (as well
- * as the {@code "YYYY-MM-DD"} string the Avro reader emits for LocalDate)
all collapse to the underlying
- * raw numeric form the native reader returns.
- */
- private static Map<String, Object> canonicalize(Map<String, Object> row) {
- Map<String, Object> out = new TreeMap<>();
+ /// Wraps `byte[]` as [ByteArray] and converts `Object[]` to `List`,
recursing through nested [Map] / [List] values,
+ /// so the row map can be compared via [Map#equals] without tripping on JVM
array reference-equality.
+ private static Map<String, Object> wrapArray(Map<String, Object> row) {
+ Map<String, Object> out = Maps.newHashMapWithExpectedSize(row.size());
for (Map.Entry<String, Object> e : row.entrySet()) {
- out.put(e.getKey(), canonicalizeValue(e.getValue()));
+ out.put(e.getKey(), wrapArrayValue(e.getValue()));
}
return out;
}
@SuppressWarnings("unchecked")
- private static Object canonicalizeValue(Object value) {
+ private static Object wrapArrayValue(Object value) {
+ if (value instanceof byte[]) {
+ return new ByteArray((byte[]) value);
+ }
if (value instanceof Object[]) {
- Object[] arr = (Object[]) value;
- List<Object> list = new ArrayList<>(arr.length);
- for (Object o : arr) {
- list.add(canonicalizeValue(o));
+ Object[] array = (Object[]) value;
+ List<Object> list = new ArrayList<>(array.length);
+ for (Object v : array) {
+ list.add(wrapArrayValue(v));
}
return list;
}
if (value instanceof List) {
List<Object> in = (List<Object>) value;
- List<Object> list = new ArrayList<>(in.size());
- for (Object o : in) {
- list.add(canonicalizeValue(o));
+ List<Object> out = new ArrayList<>(in.size());
+ for (Object v : in) {
+ out.add(wrapArrayValue(v));
}
- return list;
+ return out;
}
if (value instanceof Map) {
- Map<String, Object> in = (Map<String, Object>) value;
- Map<String, Object> out = new TreeMap<>();
- for (Map.Entry<String, Object> e : in.entrySet()) {
- out.put(e.getKey(), canonicalizeValue(e.getValue()));
+ Map<Object, Object> in = (Map<Object, Object>) value;
+ Map<Object, Object> out = Maps.newHashMapWithExpectedSize(in.size());
+ for (Map.Entry<Object, Object> e : in.entrySet()) {
+ out.put(e.getKey(), wrapArrayValue(e.getValue()));
}
return out;
Review Comment:
In wrapArrayValue(Map) you only wrap/normalize the *values* recursively, but
you leave map *keys* unchanged. If any extractor returns a nested map with a
`byte[]` key (e.g., binary map keys in Parquet/ORC), `Map.equals()` will still
use reference-equality for those keys and this equivalence test will fail even
when the two readers are logically identical. Consider wrapping/normalizing
keys too (at least `byte[]` → ByteArray, and recursively handling
Object[]/Map/List keys if they can appear).
##########
pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractor.java:
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.orc;
+
+import com.google.common.collect.Maps;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/// Extracts a single ORC row into a [GenericRow]. Input is an
[ORCRecordExtractor.Record] handle wrapping
+/// a [VectorizedRowBatch] + schema + row index. Dispatch happens in
[#extractValue] (complex types —
+/// `LIST` / `MAP` / `STRUCT`) and [#extractSingleValue] (primitives); ORC
values never flow through
+/// `convertSingleValue`, so widening / `Temporal` handling is done locally
here.
+///
+/// **ORC schema category → Java output type:**
+/// - `BOOLEAN` → `Boolean`
+/// - `BYTE` / `SHORT` / `INT` → `Integer` (small ints widen to `Integer`)
+/// - `LONG` → `Long`
+/// - `FLOAT` → `Float`
+/// - `DOUBLE` → `Double`
+/// - `DECIMAL` → `BigDecimal`
+/// - `STRING` / `VARCHAR` / `CHAR` → `String`
+/// - `BINARY` → `byte[]`
+/// - `DATE` → [LocalDate] via [LocalDate#ofEpochDay] (TZ-independent,
calendar-date semantics)
+/// - `TIMESTAMP` / `TIMESTAMP_INSTANT` → [Timestamp] preserving full
sub-second nanos from [TimestampColumnVector]
+/// - `LIST<X>` → `Object[]` (null elements preserved; empty list surfaces as
empty `Object[]`)
+/// - `MAP<K, V>` → `Map<Object, Object>`
+/// - `STRUCT<...>` → `Map<String, Object>`
+/// - any nullable column with `isNull[rowId]` set → `null`
+public class ORCRecordExtractor extends
BaseRecordExtractor<ORCRecordExtractor.Record> {
+
+ /// One ORC row's worth of state. The reader allocates a single instance and
calls [#set] on each row
+ /// advance — no per-row allocation.
+ public static final class Record {
+ VectorizedRowBatch _batch;
+ TypeDescription _schema;
+ int _rowId;
+
+ public void set(VectorizedRowBatch batch, TypeDescription schema, int
rowId) {
+ _batch = batch;
+ _schema = schema;
+ _rowId = rowId;
+ }
+ }
+
+ // Cached `_fields ∩ source-schema` → child-index map for the include-list
path. The ORC source schema is
+ // fixed for the lifetime of the reader (and therefore the extractor
instance), so we build this lazily on
+ // the first row and reuse it from then on — no per-row schema-identity
check needed. Iterating this map
+ // directly drives the extraction loop, so no per-row HashMap lookup against
the source schema either.
+ private Map<String, Integer> _fieldToIndexMap;
+
+ @Override
+ public GenericRow extract(Record from, GenericRow to) {
+ TypeDescription schema = from._schema;
+ List<TypeDescription> fieldTypes = schema.getChildren();
+ ColumnVector[] cols = from._batch.cols;
+ int rowId = from._rowId;
+ if (_extractAll) {
+ List<String> fieldNames = schema.getFieldNames();
+ int numFields = fieldNames.size();
+ for (int i = 0; i < numFields; i++) {
+ String fieldName = fieldNames.get(i);
+ to.putValue(fieldName, extractValue(fieldName, cols[i],
fieldTypes.get(i), rowId));
+ }
+ } else {
+ if (_fieldToIndexMap == null) {
+ _fieldToIndexMap = buildFieldToIndexMap(schema, _fields);
+ }
+ for (Map.Entry<String, Integer> entry : _fieldToIndexMap.entrySet()) {
+ int index = entry.getValue();
+ to.putValue(entry.getKey(), extractValue(entry.getKey(), cols[index],
fieldTypes.get(index), rowId));
+ }
+ }
+ return to;
+ }
+
+ private static Map<String, Integer> buildFieldToIndexMap(TypeDescription
schema, Set<String> fields) {
+ Map<String, Integer> map = Maps.newHashMapWithExpectedSize(fields.size());
+ List<String> fieldNames = schema.getFieldNames();
+ int numFieldsInSchema = fieldNames.size();
+ for (int i = 0; i < numFieldsInSchema; i++) {
+ String name = fieldNames.get(i);
+ if (fields.contains(name)) {
+ map.put(name, i);
+ }
+ }
+ return map;
+ }
+
+ /// Extracts the value at `rowId` from `columnVector`, dispatching by
`fieldType.getCategory()`. Recurses
+ /// into [#extractValue] for nested complex types and falls through to
[#extractSingleValue] for primitives.
+ @Nullable
+ private static Object extractValue(String field, ColumnVector columnVector,
TypeDescription fieldType, int rowId) {
+ if (columnVector.isRepeating) {
+ rowId = 0;
+ }
+ if (!columnVector.noNulls && columnVector.isNull[rowId]) {
+ return null;
+ }
+ TypeDescription.Category category = fieldType.getCategory();
+ switch (category) {
+ case LIST: {
+ TypeDescription childType = fieldType.getChildren().get(0);
+ ListColumnVector listColumnVector = (ListColumnVector) columnVector;
+ int offset = (int) listColumnVector.offsets[rowId];
+ int length = (int) listColumnVector.lengths[rowId];
+ Object[] values = new Object[length];
+ for (int j = 0; j < length; j++) {
+ values[j] = extractValue(field, listColumnVector.child, childType,
offset + j);
+ }
+ return values;
+ }
+ case MAP: {
+ // Map keys go straight to `extractSingleValue` instead of
`extractValue` — we deliberately skip the
+ // `isRepeating` / null guards that `extractValue` performs. ORC's
format invariants guarantee map
+ // keys are non-null, and the Apache ORC reader populates each entry
individually instead of
+ // collapsing the child key vector to `isRepeating = true`. Adding the
guards here would be defensive
+ // against API states no real reader produces.
+ List<TypeDescription> children = fieldType.getChildren();
+ TypeDescription.Category keyCategory = children.get(0).getCategory();
+ TypeDescription valueType = children.get(1);
+ MapColumnVector mapColumnVector = (MapColumnVector) columnVector;
+ int offset = (int) mapColumnVector.offsets[rowId];
+ int length = (int) mapColumnVector.lengths[rowId];
+ Map<Object, Object> map = new HashMap<>();
+ for (int j = 0; j < length; j++) {
+ int childRowId = offset + j;
+ Object key = extractSingleValue(field, mapColumnVector.keys,
childRowId, keyCategory);
+ Object value = extractValue(field, mapColumnVector.values,
valueType, childRowId);
+ map.put(key, value);
Review Comment:
ORC MAP key extraction calls extractSingleValue() directly on the child key
vector, but extractSingleValue() does not honor ColumnVector.isRepeating (and
skips the null guard that extractValue() applies). If ORC marks the key vector
as repeating for a batch, this will read the wrong index and can lead to
incorrect keys or out-of-bounds access. Safer approach: apply the repeating
adjustment inside extractSingleValue (or route keys through extractValue and
then enforce non-null), so MAP keys follow the same vector semantics as other
primitives.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]