This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 54601e27cdd960048644d609e5ed839758100ce4 Author: voonhous <[email protected]> AuthorDate: Thu May 7 17:29:41 2026 +0800 fix(hive): Tolerate pruned ArrayWritable in nested BLOB projection (#18581) Fixes issue: #18577 When Hive's FetchOperator pushes nested column projection (e.g. `SELECT blob_data.reference.external_path`) through Parquet via `hive.io.file.readNestedColumn.paths`, the reader returns a compacted ArrayWritable holding only the projected sub-fields in low slots, while oldSchema stays the full 3-field canonical BLOB (BlobLogicalType.validate rejects partial field lists; pruneDataSchema deliberately preserves the canonical shape). Positional indexing into the compacted array AIOBEs, and even with a bounds guard, Hive's ObjectInspector downstream expects projected values at their canonical positions - the rewrite must remap, not just survive. Introduce a projection-aware rewrite path: - HoodieProjectionMask (new) - immutable per-level descriptor of physical layout. isCanonicalAtThisLevel() means schema positions apply; otherwise physicalIndexOf / physicalOrder map field names to physical slots. - HoodieColumnProjectionUtils.buildNestedProjectionMask() - parses hive.io.file.readNestedColumn.paths, walks RECORD / BLOB / VARIANT, returns the matching mask (or all() when projection is absent). - HiveHoodieReaderContext threads the mask into a new 5-arg rewriteRecordWithNewSchema overload. - HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchemaInternal branches on the mask: - rewriteCanonicalRecord - legacy positional logic with a defensive oldField.pos() < arrayLength guard. - rewriteCompactedRecord - iterates physicalOrder() and writes each projected slot at its canonical position so the downstream ObjectInspector finds fields where it expects them. The compacted path is the primary fix; the canonical-path bounds guard is a defensive fallback. Tests: TestHoodieColumnProjectionUtils covers mask construction; TestHoodieArrayWritableSchemaUtils covers the AIOBE reproducer, compacted round-trip, and a canonical-shape regression. HoodieSchemaTestUtils gains createPlainBlobRecord and createPlainVariantRecord helpers (variant helper for upcoming VARIANT parity). --- .../hudi/common/schema/HoodieProjectionMask.java | 199 +++++++++++++++++++++ .../hudi/common/schema/HoodieSchemaTestUtils.java | 4 +- .../hudi/hadoop/HiveHoodieReaderContext.java | 12 +- .../hudi/hadoop/HoodieColumnProjectionUtils.java | 99 ++++++++++ .../utils/HoodieArrayWritableSchemaUtils.java | 179 ++++++++++++------ .../hadoop/TestHoodieColumnProjectionUtils.java | 92 ++++++++++ .../utils/TestHoodieArrayWritableSchemaUtils.java | 108 ++++++++++- 7 files changed, 630 insertions(+), 63 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieProjectionMask.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieProjectionMask.java new file mode 100644 index 000000000000..0f0a8de07816 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieProjectionMask.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; + +/** + * Describes the physical field layout of an incoming columnar record when an upstream + * reader (e.g. Hive's Parquet reader) has compacted nested struct projection — that is, + * dropped non-selected sub-fields and shifted the survivors into low slots. + * + * <p>The canonical (Hudi-side) schema declares the full ordered field list. When the + * upstream reader projects only a subset of a struct, the resulting array's slots no + * longer correspond to canonical positions. This mask captures, per record level, the + * physical layout, so position-based access can be remapped. + * + * <p>A mask carries two pieces of information for the level it describes: + * <ol> + * <li>Whether THIS level is canonical-shaped or compacted. Hive's Parquet reader pads + * non-projected top-level columns with nulls (canonical), but compacts non-projected + * sub-fields of struct columns (compacted). + * <li>Optional per-child masks. A canonical level can still descend into a child whose + * sub-record is compacted — for example, the top-level row is canonical but the + * blob_data sub-record's interior is compacted by Hive's nested-projection pushdown. + * </ol> + * + * <p>{@link #all()} is the no-op identity used everywhere outside the projected-record + * path. {@link #canonicalWith(Map)} preserves canonical positions at this level while + * supplying compacted descent for specific children. + */ +public final class HoodieProjectionMask { + + private static final HoodieProjectionMask ALL = new HoodieProjectionMask(false, Collections.emptyMap(), Collections.emptyMap()); + + // True when THIS level's ArrayWritable is compacted to only the projected fields. + // False when this level is canonical-shaped (full positions per schema, with nulls for unprojected). + private final boolean compactedAtThisLevel; + // Field name -> position in the projected ArrayWritable. Only meaningful when compactedAtThisLevel. + private final Map<String, Integer> physicalIndex; + // Per-child mask used during descent. May be non-empty even when this level is canonical. + private final Map<String, HoodieProjectionMask> childMasks; + + private HoodieProjectionMask(boolean compactedAtThisLevel, Map<String, Integer> physicalIndex, Map<String, HoodieProjectionMask> childMasks) { + this.compactedAtThisLevel = compactedAtThisLevel; + this.physicalIndex = physicalIndex; + this.childMasks = childMasks; + } + + public static HoodieProjectionMask all() { + return ALL; + } + + /** + * Mask whose THIS level is canonical-shaped but whose listed children carry their own + * (typically compacted) descent masks. Use this at boundaries where the outer record + * is full but specific sub-records have been compacted by the reader (the typical + * Hive nested-projection-on-BLOB shape). + */ + public static HoodieProjectionMask canonicalWith(Map<String, HoodieProjectionMask> childMasks) { + if (childMasks == null || childMasks.isEmpty()) { + return ALL; + } + return new HoodieProjectionMask(false, Collections.emptyMap(), Collections.unmodifiableMap(new LinkedHashMap<>(childMasks))); + } + + /** + * True when no remapping or descent override applies — the rewrite can use canonical + * positions everywhere below this point. + */ + public boolean isAll() { + return !compactedAtThisLevel && childMasks.isEmpty(); + } + + /** + * True when the level this mask describes is canonical-shaped (positions match the + * schema). Children may still carry compacted sub-masks. + */ + public boolean isCanonicalAtThisLevel() { + return !compactedAtThisLevel; + } + + /** + * Position of {@code fieldName} in the compacted ArrayWritable at this level. Empty + * when this level is canonical or the field was not projected. + */ + public OptionalInt physicalIndexOf(String fieldName) { + if (!compactedAtThisLevel) { + return OptionalInt.empty(); + } + Integer idx = physicalIndex.get(fieldName); + return idx == null ? OptionalInt.empty() : OptionalInt.of(idx); + } + + /** + * Field names of this compacted level, in physical-position order. Empty when the + * level is canonical. + */ + public List<String> physicalOrder() { + if (!compactedAtThisLevel) { + return Collections.emptyList(); + } + return new ArrayList<>(physicalIndex.keySet()); + } + + /** + * Mask to use when recursing into {@code fieldName}'s sub-record. Falls back to + * {@link #all()} when no override is registered for this child. + */ + public HoodieProjectionMask childOrAll(String fieldName) { + HoodieProjectionMask child = childMasks.get(fieldName); + return child == null ? ALL : child; + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builds a mask describing a compacted level. Insertion order defines the physical + * field order in the projected ArrayWritable. + */ + public static final class Builder { + private final LinkedHashMap<String, HoodieProjectionMask> children = new LinkedHashMap<>(); + + public Builder field(String name) { + return field(name, ALL); + } + + public Builder field(String name, HoodieProjectionMask child) { + children.put(name, Objects.requireNonNull(child, "child mask")); + return this; + } + + public HoodieProjectionMask build() { + if (children.isEmpty()) { + return ALL; + } + LinkedHashMap<String, HoodieProjectionMask> childrenCopy = new LinkedHashMap<>(children); + LinkedHashMap<String, Integer> index = new LinkedHashMap<>(children.size()); + int i = 0; + for (String name : childrenCopy.keySet()) { + index.put(name, i++); + } + return new HoodieProjectionMask( + true, + Collections.unmodifiableMap(index), + Collections.unmodifiableMap(childrenCopy)); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof HoodieProjectionMask)) { + return false; + } + HoodieProjectionMask other = (HoodieProjectionMask) o; + return compactedAtThisLevel == other.compactedAtThisLevel + && physicalIndex.equals(other.physicalIndex) + && childMasks.equals(other.childMasks); + } + + @Override + public int hashCode() { + return Objects.hash(compactedAtThisLevel, physicalIndex, childMasks); + } + + @Override + public String toString() { + if (isAll()) { + return "HoodieProjectionMask{ALL}"; + } + return "HoodieProjectionMask{compacted=" + compactedAtThisLevel + ",children=" + childMasks + "}"; + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java index faba63d66690..6052639d209e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java @@ -75,7 +75,7 @@ public class HoodieSchemaTestUtils { * was stripped by Spark's TableOutputResolver Cast. */ public static HoodieSchema createPlainBlobRecord(String recordName) { - HoodieSchema reference = HoodieSchema.createRecord("reference", null, null, false, Arrays.asList( + HoodieSchema referenceSchema = HoodieSchema.createRecord("reference", null, null, false, Arrays.asList( HoodieSchemaField.of("external_path", HoodieSchema.create(HoodieSchemaType.STRING), null, null), HoodieSchemaField.of("offset", HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)), null, null), HoodieSchemaField.of("length", HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)), null, null), @@ -83,7 +83,7 @@ public class HoodieSchemaTestUtils { return HoodieSchema.createRecord(recordName, null, null, false, Arrays.asList( HoodieSchemaField.of("type", HoodieSchema.create(HoodieSchemaType.STRING), null, null), HoodieSchemaField.of("data", HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.BYTES)), null, JsonProperties.NULL_VALUE), - HoodieSchemaField.of("reference", HoodieSchema.createNullable(reference), null, JsonProperties.NULL_VALUE))); + HoodieSchemaField.of("reference", HoodieSchema.createNullable(referenceSchema), null, JsonProperties.NULL_VALUE))); } /** diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java index 9e7994f0147b..d9e99742674d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.schema.HoodieProjectionMask; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaCompatibility; import org.apache.hudi.common.schema.HoodieSchemaField; @@ -36,6 +37,7 @@ import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieAvroSchemaException; import org.apache.hudi.hadoop.utils.HiveTypeUtils; +import org.apache.hudi.hadoop.utils.HoodieArrayWritableSchemaUtils; import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.HoodieStorage; @@ -174,11 +176,15 @@ public class HiveHoodieReaderContext extends HoodieReaderContext<ArrayWritable> firstRecordReader = recordReader; } ClosableIterator<ArrayWritable> recordIterator = new RecordReaderValueIterator<>(recordReader); - if (HoodieSchemaCompatibility.areSchemasProjectionEquivalent(modifiedDataSchema, requiredSchema)) { + HoodieProjectionMask physicalMask = HoodieColumnProjectionUtils.buildNestedProjectionMask(jobConfCopy, modifiedDataSchema); + if (physicalMask.isAll() && HoodieSchemaCompatibility.areSchemasProjectionEquivalent(modifiedDataSchema, requiredSchema)) { return recordIterator; } - // record reader puts the required columns in the positions of the data schema and nulls the rest of the columns - return new CloseableMappingIterator<>(recordIterator, recordContext.projectRecord(modifiedDataSchema, requiredSchema)); + // record reader puts the required columns in the positions of the data schema and nulls the rest of the columns; + // physicalMask additionally tells the rewrite where struct sub-fields landed when Hive's + // Parquet reader compacted nested-column projection. + return new CloseableMappingIterator<>(recordIterator, + record -> HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(record, modifiedDataSchema, requiredSchema, Collections.emptyMap(), physicalMask)); } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java index 8971220d1330..c89d37f1c306 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java @@ -18,6 +18,10 @@ package org.apache.hudi.hadoop; +import org.apache.hudi.common.schema.HoodieProjectionMask; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; @@ -37,7 +41,10 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -59,6 +66,13 @@ public class HoodieColumnProjectionUtils { * the column a's path is c.a and b's path is c.b */ public static final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names"; + /** + * Hive's Parquet reader (DataWritableReadSupport) reads this conf to compact nested + * struct projection. ProjectionPusher in Hive's FetchOperator sets it before invoking + * the record reader; Hudi only needs to consume it. Format: comma-separated dotted + * paths from root to leaf, e.g. {@code blob_data.reference,blob_data.reference.external_path}. + */ + public static final String READ_NESTED_COLUMN_PATH_CONF_STR = "hive.io.file.readNestedColumn.paths"; private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = ""; private static final String READ_COLUMN_NAMES_CONF_STR_DEFAULT = ""; @@ -165,4 +179,89 @@ public class HoodieColumnProjectionUtils { return false; } } + + /** + * Build a {@link HoodieProjectionMask} reflecting Hive's nested-column projection for + * {@code dataSchema}. Hive's Parquet reader pads non-projected top-level columns with + * nulls (canonical layout) but compacts non-projected sub-fields of struct columns. + * The returned mask preserves canonical positions at the row level and supplies a + * compacted descent mask for each top-level column whose interior was pruned. + * + * <p>Returns {@link HoodieProjectionMask#all()} when the conf is empty or no struct + * column has a sub-field projection. + */ + public static HoodieProjectionMask buildNestedProjectionMask(Configuration conf, HoodieSchema dataSchema) { + String paths = conf.get(READ_NESTED_COLUMN_PATH_CONF_STR, ""); + if (paths.isEmpty()) { + return HoodieProjectionMask.all(); + } + // Group nested paths by their top-level column. Top-level-only paths (single + // component, e.g. "blob_data") are ignored here — Hive does not compact at the + // top level, so canonical positions still apply. + Map<String, List<List<String>>> pathsByField = new LinkedHashMap<>(); + for (String path : paths.split(",")) { + String trimmed = path.trim(); + if (trimmed.isEmpty()) { + continue; + } + String[] components = trimmed.split("\\."); + if (components.length < 2) { + continue; + } + List<String> tail = new ArrayList<>(components.length - 1); + for (int i = 1; i < components.length; i++) { + tail.add(components[i].toLowerCase(Locale.ROOT)); + } + String head = components[0].toLowerCase(Locale.ROOT); + pathsByField.computeIfAbsent(head, k -> new ArrayList<>()).add(tail); + } + if (pathsByField.isEmpty()) { + return HoodieProjectionMask.all(); + } + HoodieSchema rowSchema = dataSchema.getNonNullType(); + Map<String, HoodieProjectionMask> childMasks = new LinkedHashMap<>(); + for (HoodieSchemaField topField : rowSchema.getFields()) { + String key = topField.name().toLowerCase(Locale.ROOT); + List<List<String>> nestedPaths = pathsByField.get(key); + if (nestedPaths == null) { + continue; + } + HoodieProjectionMask childMask = buildMaskForRecord(topField.schema(), nestedPaths); + if (!childMask.isAll()) { + childMasks.put(topField.name(), childMask); + } + } + return HoodieProjectionMask.canonicalWith(childMasks); + } + + private static HoodieProjectionMask buildMaskForRecord(HoodieSchema schema, List<List<String>> paths) { + HoodieSchema recordSchema = schema.getNonNullType(); + HoodieSchemaType type = recordSchema.getType(); + if (type != HoodieSchemaType.RECORD && type != HoodieSchemaType.BLOB && type != HoodieSchemaType.VARIANT) { + return HoodieProjectionMask.all(); + } + // Group remaining components by first-level field name; lower-case to match + // Hive's lowercased column names. + Map<String, List<List<String>>> pathsByField = new LinkedHashMap<>(); + for (List<String> path : paths) { + if (path.isEmpty()) { + // Whole sub-record projected as-is — no compaction below this point. + return HoodieProjectionMask.all(); + } + String head = path.get(0); + List<String> tail = path.subList(1, path.size()); + pathsByField.computeIfAbsent(head, k -> new ArrayList<>()).add(tail); + } + HoodieProjectionMask.Builder builder = HoodieProjectionMask.builder(); + for (HoodieSchemaField field : recordSchema.getFields()) { + String key = field.name().toLowerCase(Locale.ROOT); + List<List<String>> childPaths = pathsByField.get(key); + if (childPaths == null) { + continue; + } + HoodieProjectionMask childMask = buildMaskForRecord(field.schema(), childPaths); + builder.field(field.name(), childMask); + } + return builder.build(); + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java index a468de4f52a5..365f7a44cc3a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.utils; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.schema.HoodieProjectionMask; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaCompatibility; import org.apache.hudi.common.schema.HoodieSchemaField; @@ -63,22 +64,40 @@ import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; public class HoodieArrayWritableSchemaUtils { public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String> renameCols) { - return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema, newSchema, renameCols, new LinkedList<>()); + return rewriteRecordWithNewSchema(writable, oldSchema, newSchema, renameCols, HoodieProjectionMask.all()); } - private static Writable rewriteRecordWithNewSchema(Writable writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) { + /** + * Rewrite variant aware of an upstream reader having compacted nested struct projection. + * + * <p>{@code physicalMask} describes the actual ArrayWritable layout per record level: + * which sub-fields are present and at what physical index. Pass {@link HoodieProjectionMask#all()} + * when the input is canonical-shaped — the rewrite then reduces to position-by-canonical-pos + * access exactly like the legacy 4-arg overload. + */ + public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable writable, HoodieSchema oldSchema, HoodieSchema newSchema, + Map<String, String> renameCols, HoodieProjectionMask physicalMask) { + return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema, newSchema, renameCols, new LinkedList<>(), physicalMask); + } + + private static Writable rewriteRecordWithNewSchema(Writable writable, HoodieSchema oldSchema, HoodieSchema newSchema, + Map<String, String> renameCols, Deque<String> fieldNames, HoodieProjectionMask physicalMask) { if (writable == null) { return null; } HoodieSchema oldSchemaNonNull = oldSchema.getNonNullType(); HoodieSchema newSchemaNonNull = newSchema.getNonNullType(); - if (HoodieSchemaCompatibility.areSchemasProjectionEquivalent(oldSchemaNonNull, newSchemaNonNull)) { + // Only short-circuit when the input is in canonical shape; an upstream-projected + // ArrayWritable needs the recursive rewrite to remap slots, even when the schema + // pair would otherwise be equivalent. + if (physicalMask.isAll() && HoodieSchemaCompatibility.areSchemasProjectionEquivalent(oldSchemaNonNull, newSchemaNonNull)) { return writable; } - return rewriteRecordWithNewSchemaInternal(writable, oldSchemaNonNull, newSchemaNonNull, renameCols, fieldNames); + return rewriteRecordWithNewSchemaInternal(writable, oldSchemaNonNull, newSchemaNonNull, renameCols, fieldNames, physicalMask); } - private static Writable rewriteRecordWithNewSchemaInternal(Writable writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) { + private static Writable rewriteRecordWithNewSchemaInternal(Writable writable, HoodieSchema oldSchema, HoodieSchema newSchema, + Map<String, String> renameCols, Deque<String> fieldNames, HoodieProjectionMask physicalMask) { switch (newSchema.getType()) { // BLOB/VARIANT are physically records; share the RECORD field-by-name rewrite. case BLOB: @@ -87,55 +106,10 @@ public class HoodieArrayWritableSchemaUtils { if (!(writable instanceof ArrayWritable)) { throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as a record", writable.getClass().getName())); } - - ArrayWritable arrayWritable = (ArrayWritable) writable; - List<HoodieSchemaField> fields = newSchema.getFields(); - // projection will keep the size from the "from" schema because it gets recycled - // and if the size changes the reader will fail - boolean noFieldsRenaming = renameCols.isEmpty(); - String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames); - Writable[] values = new Writable[Math.max(fields.size(), arrayWritable.get().length)]; - for (int i = 0; i < fields.size(); i++) { - HoodieSchemaField newField = newSchema.getFields().get(i); - String newFieldName = newField.name(); - fieldNames.push(newFieldName); - Option<HoodieSchemaField> oldFieldOpt = noFieldsRenaming - ? oldSchema.getField(newFieldName) - : oldSchema.getField(getOldFieldNameWithRenaming(namePrefix, newFieldName, renameCols)); - if (oldFieldOpt.isPresent()) { - HoodieSchemaField oldField = oldFieldOpt.get(); - values[i] = rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], oldField.schema(), newField.schema(), renameCols, fieldNames); - } else if (newField.defaultVal().isPresent() && newField.defaultVal().get().equals(HoodieSchema.NULL_VALUE)) { - values[i] = NullWritable.get(); - } else if (!newField.schema().isNullable() && newField.defaultVal().isEmpty()) { - throw new SchemaCompatibilityException("Field " + createFullName(fieldNames) + " has no default value and is non-nullable"); - } else if (newField.defaultVal().isPresent()) { - switch (newField.getNonNullSchema().getType()) { - case BOOLEAN: - values[i] = new BooleanWritable((Boolean) newField.defaultVal().get()); - break; - case INT: - values[i] = new IntWritable((Integer) newField.defaultVal().get()); - break; - case LONG: - values[i] = new LongWritable((Long) newField.defaultVal().get()); - break; - case FLOAT: - values[i] = new FloatWritable((Float) newField.defaultVal().get()); - break; - case DOUBLE: - values[i] = new DoubleWritable((Double) newField.defaultVal().get()); - break; - case STRING: - values[i] = new Text(newField.defaultVal().toString()); - break; - default: - throw new SchemaCompatibilityException("Field " + createFullName(fieldNames) + " has no default value"); - } - } - fieldNames.pop(); + if (physicalMask.isCanonicalAtThisLevel()) { + return rewriteCanonicalRecord((ArrayWritable) writable, oldSchema, newSchema, renameCols, fieldNames, physicalMask); } - return new ArrayWritable(Writable.class, values); + return rewriteCompactedRecord((ArrayWritable) writable, oldSchema, newSchema, renameCols, fieldNames, physicalMask); case ENUM: if ((writable instanceof BytesWritable)) { @@ -155,7 +129,7 @@ public class HoodieArrayWritableSchemaUtils { ArrayWritable array = (ArrayWritable) writable; fieldNames.push("element"); for (int i = 0; i < array.get().length; i++) { - array.get()[i] = rewriteRecordWithNewSchema(array.get()[i], oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames); + array.get()[i] = rewriteRecordWithNewSchema(array.get()[i], oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames, HoodieProjectionMask.all()); } fieldNames.pop(); return array; @@ -167,7 +141,8 @@ public class HoodieArrayWritableSchemaUtils { fieldNames.push("value"); for (int i = 0; i < map.get().length; i++) { Writable mapEntry = map.get()[i]; - ((ArrayWritable) mapEntry).get()[1] = rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1], oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames); + ((ArrayWritable) mapEntry).get()[1] = + rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1], oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames, HoodieProjectionMask.all()); } return map; @@ -179,6 +154,100 @@ public class HoodieArrayWritableSchemaUtils { } } + /** + * Canonical record rewrite: input ArrayWritable matches {@code oldSchema}'s field + * positions (with possibly trailing nulls); output is sized to + * {@code max(newSchema field count, input array length)} so any trailing slots Hive + * left in a recycled row buffer are preserved, while schema-evolution semantics still + * apply to the leading {@code newSchema}-sized region (default values, null padding, + * non-nullable-without-default throws). + */ + private static Writable rewriteCanonicalRecord(ArrayWritable arrayWritable, HoodieSchema oldSchema, HoodieSchema newSchema, + Map<String, String> renameCols, Deque<String> fieldNames, HoodieProjectionMask physicalMask) { + List<HoodieSchemaField> fields = newSchema.getFields(); + boolean noFieldsRenaming = renameCols.isEmpty(); + String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames); + Writable[] values = new Writable[Math.max(fields.size(), arrayWritable.get().length)]; + for (int i = 0; i < fields.size(); i++) { + HoodieSchemaField newField = fields.get(i); + String newFieldName = newField.name(); + fieldNames.push(newFieldName); + Option<HoodieSchemaField> oldFieldOpt = noFieldsRenaming + ? oldSchema.getField(newFieldName) + : oldSchema.getField(getOldFieldNameWithRenaming(namePrefix, newFieldName, renameCols)); + + // Bounds-check because some upstream readers may still hand back arrays shorter + // than the schema declares (no projection mask supplied). + if (oldFieldOpt.isPresent() && oldFieldOpt.get().pos() < arrayWritable.get().length) { + HoodieSchemaField oldField = oldFieldOpt.get(); + HoodieProjectionMask childMask = physicalMask.childOrAll(oldField.name()); + values[i] = rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], oldField.schema(), newField.schema(), renameCols, fieldNames, childMask); + } else if (newField.defaultVal().isPresent() && newField.defaultVal().get().equals(HoodieSchema.NULL_VALUE)) { + values[i] = NullWritable.get(); + } else if (!newField.schema().isNullable() && newField.defaultVal().isEmpty()) { + throw new SchemaCompatibilityException("Field " + createFullName(fieldNames) + " has no default value and is non-nullable"); + } else if (newField.defaultVal().isPresent()) { + switch (newField.getNonNullSchema().getType()) { + case BOOLEAN: + values[i] = new BooleanWritable((Boolean) newField.defaultVal().get()); + break; + case INT: + values[i] = new IntWritable((Integer) newField.defaultVal().get()); + break; + case LONG: + values[i] = new LongWritable((Long) newField.defaultVal().get()); + break; + case FLOAT: + values[i] = new FloatWritable((Float) newField.defaultVal().get()); + break; + case DOUBLE: + values[i] = new DoubleWritable((Double) newField.defaultVal().get()); + break; + case STRING: + values[i] = new Text(newField.defaultVal().toString()); + break; + default: + throw new SchemaCompatibilityException("Field " + createFullName(fieldNames) + " has no default value"); + } + } + fieldNames.pop(); + } + return new ArrayWritable(Writable.class, values); + } + + /** + * Compacted record rewrite: input ArrayWritable matches the upstream reader's + * projected schema (only the projected sub-fields, in declared order). The output + * preserves the same physical layout — Hive's downstream {@code ArrayWritableObjectInspector} + * was constructed from the projected schema and expects compacted positions. + * Per-element schema conversion still applies (e.g. plain STRING → canonical ENUM). + */ + private static Writable rewriteCompactedRecord(ArrayWritable arrayWritable, HoodieSchema oldSchema, HoodieSchema newSchema, + Map<String, String> renameCols, Deque<String> fieldNames, HoodieProjectionMask physicalMask) { + Writable[] inputs = arrayWritable.get(); + Writable[] values = new Writable[inputs.length]; + List<String> order = physicalMask.physicalOrder(); + for (int physIdx = 0; physIdx < order.size(); physIdx++) { + if (physIdx >= inputs.length) { + // Mask claims a field at a position the reader didn't fill; defensive — leave null. + continue; + } + String physicalFieldName = order.get(physIdx); + Option<HoodieSchemaField> oldFieldOpt = oldSchema.getField(physicalFieldName); + Option<HoodieSchemaField> newFieldOpt = newSchema.getField(physicalFieldName); + if (oldFieldOpt.isEmpty() || newFieldOpt.isEmpty()) { + // The reader returned a sub-field neither side knows about — pass through. + values[physIdx] = inputs[physIdx]; + continue; + } + fieldNames.push(physicalFieldName); + HoodieProjectionMask childMask = physicalMask.childOrAll(physicalFieldName); + values[physIdx] = rewriteRecordWithNewSchema(inputs[physIdx], oldFieldOpt.get().schema(), newFieldOpt.get().schema(), renameCols, fieldNames, childMask); + fieldNames.pop(); + } + return new ArrayWritable(Writable.class, values); + } + public static Writable rewritePrimaryType(Writable writable, HoodieSchema oldSchema, HoodieSchema newSchema) { if (oldSchema.getType() == newSchema.getType()) { switch (oldSchema.getType()) { diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java index e7467acef5d0..fcf5ac44efba 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java @@ -18,10 +18,21 @@ package org.apache.hudi.hadoop; +import org.apache.hudi.common.schema.HoodieProjectionMask; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaTestUtils; +import org.apache.hudi.common.schema.HoodieSchemaType; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.OptionalInt; + +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -73,4 +84,85 @@ public class TestHoodieColumnProjectionUtils { TypeInfo typeInfo11 = TypeInfoUtils.getTypeInfosFromTypeString(col11).get(0); assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo11)); } + + @Test + void testBuildNestedProjectionMaskEmptyConfYieldsAll() { + HoodieSchema row = rowSchemaWithBlobColumn(); + Configuration conf = new Configuration(); + HoodieProjectionMask mask = HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row); + assertTrue(mask.isAll()); + } + + @Test + void testBuildNestedProjectionMaskTopLevelOnlyYieldsAll() { + // Top-level pruning never compacts; only nested paths produce a mask. + HoodieSchema row = rowSchemaWithBlobColumn(); + Configuration conf = new Configuration(); + conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR, "blob_data,id"); + HoodieProjectionMask mask = HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row); + assertTrue(mask.isAll()); + } + + @Test + void testBuildNestedProjectionMaskForBlobReferenceProjection() { + HoodieSchema row = rowSchemaWithBlobColumn(); + Configuration conf = new Configuration(); + conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR, "blob_data.reference"); + HoodieProjectionMask mask = HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row); + + assertFalse(mask.isAll()); + assertTrue(mask.isCanonicalAtThisLevel()); + HoodieProjectionMask blobChild = mask.childOrAll("blob_data"); + assertFalse(blobChild.isAll()); + assertFalse(blobChild.isCanonicalAtThisLevel()); + assertEquals(OptionalInt.of(0), blobChild.physicalIndexOf("reference")); + assertEquals(OptionalInt.empty(), blobChild.physicalIndexOf("type")); + assertEquals(OptionalInt.empty(), blobChild.physicalIndexOf("data")); + assertTrue(blobChild.childOrAll("reference").isAll()); + } + + @Test + void testBuildNestedProjectionMaskForExternalPathProjection() { + HoodieSchema row = rowSchemaWithBlobColumn(); + Configuration conf = new Configuration(); + conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR, "blob_data.reference.external_path"); + HoodieProjectionMask mask = HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row); + + HoodieProjectionMask blobChild = mask.childOrAll("blob_data"); + assertEquals(OptionalInt.of(0), blobChild.physicalIndexOf("reference")); + HoodieProjectionMask refChild = blobChild.childOrAll("reference"); + assertFalse(refChild.isAll()); + assertEquals(OptionalInt.of(0), refChild.physicalIndexOf("external_path")); + assertEquals(OptionalInt.empty(), refChild.physicalIndexOf("offset")); + } + + @Test + void testBuildNestedProjectionMaskOrdersByDeclaredSchema() { + // Physical order follows the schema, not the conf-path order. + HoodieSchema row = rowSchemaWithBlobColumn(); + Configuration conf = new Configuration(); + conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR, "blob_data.reference,blob_data.type"); + HoodieProjectionMask mask = HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row); + + HoodieProjectionMask blobChild = mask.childOrAll("blob_data"); + assertEquals(OptionalInt.of(0), blobChild.physicalIndexOf("type")); + assertEquals(OptionalInt.of(1), blobChild.physicalIndexOf("reference")); + } + + @Test + void testBuildNestedProjectionMaskCaseInsensitive() { + HoodieSchema row = rowSchemaWithBlobColumn(); + Configuration conf = new Configuration(); + conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR, "BLOB_DATA.Reference.External_Path"); + HoodieProjectionMask mask = HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row); + + assertFalse(mask.childOrAll("blob_data").isAll()); + assertEquals(OptionalInt.of(0), mask.childOrAll("blob_data").physicalIndexOf("reference")); + } + + private static HoodieSchema rowSchemaWithBlobColumn() { + HoodieSchemaField idField = HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.LONG), null, null); + HoodieSchemaField blobField = HoodieSchemaField.of("blob_data", HoodieSchemaTestUtils.createPlainBlobRecord("blob_data"), null, null); + return HoodieSchema.createRecord("test_row", null, null, false, Arrays.asList(idField, blobField)); + } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java index 16188b520074..ff5bdbc18694 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java @@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.utils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.engine.RecordContext; +import org.apache.hudi.common.schema.HoodieProjectionMask; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaTestUtils; import org.apache.hudi.common.schema.HoodieSchemaType; @@ -334,6 +335,107 @@ public class TestHoodieArrayWritableSchemaUtils { assertSame(bytes, rewritten); } + @Test + void testRewriteBlobWithPrunedArrayWritableFillsMissingFieldsWithNull() { + HoodieSchema oldSchema = HoodieSchemaTestUtils.createPlainBlobRecord("blob_data"); + HoodieSchema newSchema = HoodieSchema.createBlob(); + ArrayWritable prunedRecord = new ArrayWritable(Writable.class, new Writable[] { + new Text("INLINE") + }); + + ArrayWritable rewritten = HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema( + prunedRecord, oldSchema, newSchema, Collections.emptyMap()); + + assertEquals(3, rewritten.get().length); + assertInstanceOf(BytesWritable.class, rewritten.get()[0]); + assertEquals("INLINE", new String(((BytesWritable) rewritten.get()[0]).copyBytes())); + assertEquals(NullWritable.get(), rewritten.get()[1]); + assertEquals(NullWritable.get(), rewritten.get()[2]); + } + + @Test + void testRewriteBlobWithPrunedReferenceProjection() { + // SELECT blob_data.reference: compacted-shape input must round-trip unchanged so + // Hive's projected-schema ObjectInspector finds reference at slot 0. + HoodieSchema oldSchema = HoodieSchemaTestUtils.createPlainBlobRecord("blob_data"); + HoodieSchema newSchema = HoodieSchema.createBlob(); + ArrayWritable referenceSubstruct = new ArrayWritable(Writable.class, new Writable[] { + new Text("blobs/updated-1"), + new LongWritable(0L), + new LongWritable(11L), + new BooleanWritable(false) + }); + ArrayWritable prunedRecord = new ArrayWritable(Writable.class, new Writable[] { + referenceSubstruct + }); + HoodieProjectionMask mask = HoodieProjectionMask.builder().field("reference").build(); + + ArrayWritable rewritten = HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema( + prunedRecord, oldSchema, newSchema, Collections.emptyMap(), mask); + + assertEquals(1, rewritten.get().length); + ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[0]; + assertEquals(4, rewrittenRef.get().length); + assertEquals(new Text("blobs/updated-1"), rewrittenRef.get()[0]); + assertEquals(new LongWritable(0L), rewrittenRef.get()[1]); + assertEquals(new LongWritable(11L), rewrittenRef.get()[2]); + assertEquals(new BooleanWritable(false), rewrittenRef.get()[3]); + } + + @Test + void testRewriteBlobWithPrunedReferenceExternalPathProjection() { + // SELECT blob_data.reference.external_path — reproducer for the CCE at :149. + HoodieSchema oldSchema = HoodieSchemaTestUtils.createPlainBlobRecord("blob_data"); + HoodieSchema newSchema = HoodieSchema.createBlob(); + ArrayWritable prunedReference = new ArrayWritable(Writable.class, new Writable[] { + new Text("blobs/updated-1") + }); + ArrayWritable prunedRecord = new ArrayWritable(Writable.class, new Writable[] { + prunedReference + }); + HoodieProjectionMask mask = HoodieProjectionMask.builder() + .field("reference", HoodieProjectionMask.builder().field("external_path").build()) + .build(); + + ArrayWritable rewritten = HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema( + prunedRecord, oldSchema, newSchema, Collections.emptyMap(), mask); + + assertEquals(1, rewritten.get().length); + ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[0]; + assertEquals(1, rewrittenRef.get().length); + assertEquals(new Text("blobs/updated-1"), rewrittenRef.get()[0]); + } + + @Test + void testRewriteBlobWithCanonicalShapeStillWorksAfterMaskWiring() { + // Regression guard: mask=all() must keep the legacy canonical-shape behavior. + HoodieSchema oldSchema = HoodieSchemaTestUtils.createPlainBlobRecord("blob_data"); + HoodieSchema newSchema = HoodieSchema.createBlob(); + ArrayWritable referenceSubstruct = new ArrayWritable(Writable.class, new Writable[] { + new Text("blobs/path-1"), + new LongWritable(0L), + new LongWritable(11L), + new BooleanWritable(false) + }); + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[] { + new Text("OUT_OF_LINE"), + NullWritable.get(), + referenceSubstruct + }); + + ArrayWritable rewritten = HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema( + record, oldSchema, newSchema, Collections.emptyMap(), HoodieProjectionMask.all()); + + assertInstanceOf(BytesWritable.class, rewritten.get()[0]); + assertEquals("OUT_OF_LINE", new String(((BytesWritable) rewritten.get()[0]).copyBytes())); + assertEquals(NullWritable.get(), rewritten.get()[1]); + ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[2]; + assertEquals(new Text("blobs/path-1"), rewrittenRef.get()[0]); + assertEquals(new LongWritable(0L), rewrittenRef.get()[1]); + assertEquals(new LongWritable(11L), rewrittenRef.get()[2]); + assertEquals(new BooleanWritable(false), rewrittenRef.get()[3]); + } + private void validateRewriteWithAvro( Writable oldWritable, HoodieSchema oldSchema, @@ -452,9 +554,9 @@ public class TestHoodieArrayWritableSchemaUtils { void testRewritePlainVariantRecordToCanonicalVariantSchema() { HoodieSchema oldSchema = HoodieSchemaTestUtils.createPlainVariantRecord("variant_data"); HoodieSchema newSchema = HoodieSchema.createVariant(); - BytesWritable metadata = new BytesWritable(new byte[]{1, 2, 3}); - BytesWritable value = new BytesWritable(new byte[]{4, 5, 6}); - ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{metadata, value}); + BytesWritable metadata = new BytesWritable(new byte[] {1, 2, 3}); + BytesWritable value = new BytesWritable(new byte[] {4, 5, 6}); + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[] {metadata, value}); ArrayWritable rewritten = HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema( record, oldSchema, newSchema, Collections.emptyMap());
