Copilot commented on code in PR #18400:
URL: https://github.com/apache/pinot/pull/18400#discussion_r3180329149


##########
pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java:
##########
@@ -311,11 +307,31 @@ private Map<String, Object> extractKeyValueMap(Group 
group) {
       Group keyValueGroup = group.getGroup(0, i);
       Object key = extractValue(keyValueGroup, keyIndex);
       Object value = extractValue(keyValueGroup, valueIndex);
-      map.put(key.toString(), value);
+      map.put(stringifyMapKey(key), value);
     }
     return map;
   }
 
+  /// Stringifies a map key for the Pinot `Map<String, Object>` contract, 
matching the JSON-serialization
+  /// convention used elsewhere in this PR (`JsonUtils` keeps 
`WRITE_DATES_AS_TIMESTAMPS = true`):
+  /// - `byte[]` (unannotated BINARY / FIXED) → base64. Same form Jackson uses 
for `byte[]` values, so the
+  ///   key reads uniformly with the rest of a serialized map. Pinot's 
BYTES→STRING column-type conversion
+  ///   uses hex via `BytesUtils.toHexString`, but the MAP path goes through 
Jackson — base64 wins on
+  ///   consistency here.
+  /// - `Timestamp` → numeric epoch millis as a string. Matches Jackson's 
Timestamp value-serialization
+  ///   (`WRITE_DATES_AS_TIMESTAMPS = true`) and avoids `Timestamp#toString`'s 
JVM-default-TZ dependency.
+  /// - Everything else (`String`, `Integer`, `Long`, `Float`, `Double`, 
`Boolean`, `BigDecimal`, `UUID`,
+  ///   `LocalDate`, `LocalTime`) has a stable, TZ-independent `toString`.
+  private static String stringifyMapKey(Object key) {
+    if (key instanceof byte[]) {
+      return Base64.getEncoder().encodeToString((byte[]) key);
+    }
+    if (key instanceof Timestamp) {
+      return Long.toString(((Timestamp) key).getTime());

Review Comment:
   `stringifyMapKey()` collapses every `Timestamp` key to 
`timestamp.getTime()`, which drops any microsecond/nanosecond part. Parquet 
maps whose key type is `TIMESTAMP_MICROS`, `TIMESTAMP_NANOS`, or `INT96` can 
therefore overwrite distinct entries that fall in the same millisecond, 
corrupting the extracted map.



##########
pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java:
##########
@@ -48,87 +58,189 @@
 /// - avro nested `record` → [GenericRecord] → `Map<String, Object>`
 /// - avro `union[null, X]` with the `null` branch selected → `null`
 ///
-/// **Logical types** (applied when `enableLogicalTypes = true`, the config 
default; `GenericDatumReader` emits
-/// the post-conversion Java types listed below):
-/// - `decimal` → `BigDecimal` → `BigDecimal`
-/// - `uuid` → `UUID` → `String` (UUID string form via `UUID.toString()`)
-/// - `date` → `LocalDate` → `LocalDate` (passes through; TZ-independent)
-/// - `time-millis` / `time-micros` → `LocalTime` → `LocalTime` (passes 
through; full ns precision preserved)
-/// - `timestamp-millis` / `timestamp-micros` → `Instant` → 
`java.sql.Timestamp` (sub-millisecond nanos preserved
-///   via `Timestamp.getNanos()`)
+/// **Logical types** (`GenericDatumReader` emits the post-Avro-conversion 
Java type, then this extractor maps to the
+///   Pinot contract type):
+/// - `decimal` → `BigDecimal` → `BigDecimal` (always converted; raw bytes 
aren't interpretable without external
+///   precision/scale)
+/// - `timestamp-millis` → `Instant` → [Timestamp], or `Long` raw epoch millis 
when `extractRawTimeValues`
+///   is `true`
+/// - `timestamp-micros` → `Instant` → [Timestamp] (sub-millisecond micros 
preserved), or `Long` raw epoch
+///   micros when `extractRawTimeValues` is `true`
+/// - `date` → `LocalDate` → `LocalDate`, or `Integer` raw days-since-epoch 
when `extractRawTimeValues` is
+///   `true`
+/// - `time-millis` → `LocalTime` → `LocalTime`, or `Integer` raw 
ms-since-midnight when
+///   `extractRawTimeValues` is `true`
+/// - `time-micros` → `LocalTime` → `LocalTime`, or `Long` raw 
µs-since-midnight when `extractRawTimeValues`
+///   is `true`
+/// - `uuid` → [java.util.UUID] → [java.util.UUID] (always converted; 
downstream type transformer adapts to
+///   the Pinot column type — `STRING` column gets canonical UUID string, 
`BYTES` column gets 16-byte
+///   big-endian form)
 public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
-  private boolean _applyLogicalTypes = true;
+
+  // Avro logical-type names (copied from org.apache.avro.LogicalTypes).
+  private static final String DECIMAL = "decimal";
+  private static final String TIMESTAMP_MILLIS = "timestamp-millis";
+  private static final String TIMESTAMP_MICROS = "timestamp-micros";
+  private static final String DATE = "date";
+  private static final String TIME_MILLIS = "time-millis";
+  private static final String TIME_MICROS = "time-micros";
+  private static final String UUID = "uuid";
+
+  private static final Map<String, Conversion<?>> CONVERSION_MAP = Map.of(
+      DECIMAL, new Conversions.DecimalConversion(),
+      TIMESTAMP_MILLIS, new TimeConversions.TimestampMillisConversion(),
+      TIMESTAMP_MICROS, new TimeConversions.TimestampMicrosConversion(),
+      DATE, new TimeConversions.DateConversion(),
+      TIME_MILLIS, new TimeConversions.TimeMillisConversion(),
+      TIME_MICROS, new TimeConversions.TimeMicrosConversion(),
+      UUID, new Conversions.UUIDConversion()
+  );
+  private static final Set<String> TEMPORAL_LOGICAL_TYPES =
+      Set.of(TIMESTAMP_MILLIS, TIMESTAMP_MICROS, DATE, TIME_MILLIS, 
TIME_MICROS);
+
+  protected boolean _extractRawTimeValues;
 
   @Override
   protected void initConfig(@Nullable RecordExtractorConfig config) {
     if (config instanceof AvroRecordExtractorConfig) {
-      _applyLogicalTypes = ((AvroRecordExtractorConfig) 
config).isEnableLogicalTypes();
+      _extractRawTimeValues = ((AvroRecordExtractorConfig) 
config).isExtractRawTimeValues();
     }
   }
 
   @Override
   public GenericRow extract(GenericRecord from, GenericRow to) {
+    Schema schema = from.getSchema();
     if (_extractAll) {
-      List<Schema.Field> fields = from.getSchema().getFields();
-      for (Schema.Field field : fields) {
+      for (Schema.Field field : schema.getFields()) {
         String fieldName = field.name();
-        Object value = from.get(fieldName);
-        if (_applyLogicalTypes) {
-          value = AvroSchemaUtil.applyLogicalType(field, value);
-        }
-        if (value != null) {
-          value = transformValue(value, field);
-        }
-        to.putValue(fieldName, value);
+        to.putValue(fieldName, convertValue(field.schema(), 
from.get(fieldName)));
       }
     } else {
       for (String fieldName : _fields) {
-        Schema.Field field = from.getSchema().getField(fieldName);
+        Schema.Field field = schema.getField(fieldName);
         if (field != null) {
-          Object value = from.get(field.pos());
-          if (_applyLogicalTypes) {
-            value = AvroSchemaUtil.applyLogicalType(field, value);
-          }
-          if (value != null) {
-            value = transformValue(value, field);
-          }
-          to.putValue(fieldName, value);
+          to.putValue(fieldName, convertValue(field.schema(), 
from.get(field.pos())));
         }
       }
     }
     return to;
   }
 
-  protected Object transformValue(Object value, Schema.Field field) {
-    return convert(value);
+  /// Schema-driven dispatch — produces a contract-compliant value in a single 
walk.
+  /// `null` short-circuits at the top so subclasses overriding [#convertLeaf] 
never see null.
+  @Nullable
+  protected Object convertValue(Schema schema, @Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    schema = resolveUnionSchema(schema, value);
+    switch (schema.getType()) {
+      case ARRAY:
+        // GenericDatumReader emits array<T> as GenericData.Array, which is a 
List.
+        return convertArray(schema, (List<?>) value);
+      case MAP:
+        // GenericDatumReader emits map<T> as a HashMap. Avro keys are always 
strings; the runtime key type
+        // is `Utf8` unless the schema declares `avro.java.string = "String"`. 
We normalize to `String`.
+        return convertMap(schema, (Map<?, ?>) value);
+      case RECORD:
+        return convertRecord(schema, (GenericRecord) value);
+      default:
+        return convertLeaf(schema, value);
+    }
   }
 
-  @Override
-  protected boolean isRecord(Object value) {
-    return value instanceof GenericRecord;
+  /// Converts a leaf (non-container) Avro value to its Pinot contract type. 
Subclasses override to handle
+  /// format-specific extensions (e.g. parquet-avro's INT96 surfaced as a 
`fixed(12)` with a sentinel doc).
+  /// The default handles the Avro spec — primitives + the seven registered 
logical types.
+  ///
+  /// Per the Avro spec, only STRING / BYTES / FIXED / INT / LONG can carry a 
logical-type annotation; the
+  /// other primitive types skip the lookup entirely. When a logical-type 
conversion runs, `Instant`
+  /// results (`timestamp-millis` / `timestamp-micros`) are post-mapped to 
[Timestamp] per the Pinot
+  /// contract. When [#_extractRawTimeValues] is set, temporal logical types 
pass through the raw
+  /// underlying integer.
+  protected Object convertLeaf(Schema schema, Object value) {
+    Schema.Type type = schema.getType();
+    switch (type) {
+      case BOOLEAN:
+      case FLOAT:
+      case DOUBLE:
+        // The Avro spec defines no logical types for these primitives — skip 
the logical-type lookup.
+        return value;
+      case ENUM:
+        // No logical types; `EnumSymbol.toString()` returns the canonical 
enum-symbol string.
+        return value.toString();
+      default:
+        // INT / LONG / STRING / BYTES / FIXED — fall through to logical-type 
handling below.
+        break;
+    }
+    LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
+    if (logicalType != null) {
+      String name = logicalType.getName();
+      if (!(_extractRawTimeValues && TEMPORAL_LOGICAL_TYPES.contains(name))) {
+        Conversion<?> conversion = CONVERSION_MAP.get(name);
+        if (conversion != null) {
+          Object converted = Conversions.convertToLogicalType(value, schema, 
logicalType, conversion);
+          return converted instanceof Instant ? Timestamp.from((Instant) 
converted) : converted;
+        }
+      }
+    }
+    // No logical-type conversion fired — fall through to physical-type 
normalization.
+    switch (type) {
+      case STRING:
+        // `Utf8.toString()` returns the decoded String; an already-`String` 
value is a no-op.
+        return value.toString();
+      case BYTES:
+        // ByteBuffer might be reused by the reader. Slice to avoid advancing 
the original buffer's position.
+        ByteBuffer slice = ((ByteBuffer) value).slice();
+        byte[] bytes = new byte[slice.limit()];
+        slice.get(bytes);
+        return bytes;
+      case FIXED:
+        return ((GenericFixed) value).bytes();
+      default:
+        // INT / LONG with no logical type, or temporal raw-mode pass-through.
+        return value;
+    }
   }
 
-  @Override
-  protected Map<Object, Object> convertRecord(Object value) {
-    GenericRecord record = (GenericRecord) value;
-    List<Schema.Field> fields = record.getSchema().getFields();
-    Map<Object, Object> convertedMap = 
Maps.newHashMapWithExpectedSize(fields.size());
+  private Object[] convertArray(Schema schema, List<?> list) {
+    Schema elementSchema = schema.getElementType();
+    int numElements = list.size();
+    Object[] result = new Object[numElements];
+    for (int i = 0; i < numElements; i++) {
+      result[i] = convertValue(elementSchema, list.get(i));
+    }
+    return result;
+  }
+
+  private Map<String, Object> convertMap(Schema schema, Map<?, ?> map) {
+    Schema valueSchema = schema.getValueType();
+    Map<String, Object> result = Maps.newHashMapWithExpectedSize(map.size());
+    for (Map.Entry<?, ?> entry : map.entrySet()) {
+      result.put(entry.getKey().toString(), convertValue(valueSchema, 
entry.getValue()));
+    }
+    return result;
+  }
+
+  private Map<String, Object> convertRecord(Schema schema, GenericRecord 
record) {
+    List<Schema.Field> fields = schema.getFields();
+    Map<String, Object> result = 
Maps.newHashMapWithExpectedSize(fields.size());
     for (Schema.Field field : fields) {
-      String fieldName = field.name();
-      Object fieldValue = record.get(fieldName);
-      Object convertedValue = fieldValue != null ? transformValue(fieldValue, 
field) : null;
-      convertedMap.put(fieldName, convertedValue);
+      result.put(field.name(), convertValue(field.schema(), 
record.get(field.name())));
     }
-    return convertedMap;
+    return result;
   }
 
-  /// Adds Avro-specific handling: [GenericFixed] (`fixed`) → `byte[]`. 
Everything else delegates to the base
-  /// (see class Javadoc for the full Avro-source → Java-output matrix).
-  @Override
-  protected Object convertSingleValue(Object value) {
-    if (value instanceof GenericFixed) {
-      return ((GenericFixed) value).bytes();
+  /// Picks the union branch matching the runtime value via Avro's standard 
union resolution
+  /// ([GenericData#resolveUnion]) — the same code path `GenericDatumWriter` 
uses, dispatching by
+  /// the runtime Java type's schema name. Avro disallows duplicate primitive 
types in a single
+  /// union, so the resolution is unambiguous. Throws 
[org.apache.avro.UnresolvedUnionException]
+  /// when the value matches no branch (a real schema-vs-data mismatch).
+  private static Schema resolveUnionSchema(Schema schema, Object value) {
+    if (!schema.isUnion()) {
+      return schema;
     }
-    return super.convertSingleValue(value);
+    int index = GenericData.get().resolveUnion(schema, value);

Review Comment:
   `resolveUnionSchema()` now uses `GenericData.get().resolveUnion(...)`, but 
the Avro file reader is built with `AvroUtils.GENERIC_DATA`, which registers 
logical-type conversions such as decimal. For a field like `union[null, 
bytes(decimal)]`, the runtime value coming from `GenericDatumReader` is 
`BigDecimal`, and the default `GenericData` cannot match that to the `bytes` 
branch, so extraction fails with `UnresolvedUnionException` for nullable 
logical-type fields that used to work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to