This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 87019a332db7 fix(hive): Tolerate pruned ArrayWritable in nested BLOB
projection (#18581)
87019a332db7 is described below
commit 87019a332db78e88ccb7968739a91a45f8f1a06f
Author: voonhous <[email protected]>
AuthorDate: Thu May 7 17:29:41 2026 +0800
fix(hive): Tolerate pruned ArrayWritable in nested BLOB projection (#18581)
Fixes issue: #18577
When Hive's FetchOperator pushes nested column projection (e.g.
`SELECT blob_data.reference.external_path`) through Parquet via
`hive.io.file.readNestedColumn.paths`, the reader returns a compacted
ArrayWritable holding only the projected sub-fields in low slots,
while oldSchema stays the full 3-field canonical BLOB
(BlobLogicalType.validate rejects partial field lists; pruneDataSchema
deliberately preserves the canonical shape). Positional indexing into
the compacted array AIOBEs, and even with a bounds guard, Hive's
ObjectInspector downstream expects projected values at their
canonical positions - the rewrite must remap, not just survive.
Introduce a projection-aware rewrite path:
- HoodieProjectionMask (new) - immutable per-level descriptor of
physical layout. isCanonicalAtThisLevel() means schema positions
apply; otherwise physicalIndexOf / physicalOrder map field names
to physical slots.
- HoodieColumnProjectionUtils.buildNestedProjectionMask() - parses
hive.io.file.readNestedColumn.paths, walks RECORD / BLOB / VARIANT,
returns the matching mask (or all() when projection is absent).
- HiveHoodieReaderContext threads the mask into a new 5-arg
rewriteRecordWithNewSchema overload.
- HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchemaInternal
branches on the mask:
- rewriteCanonicalRecord - legacy positional logic with a
defensive oldField.pos() < arrayLength guard.
- rewriteCompactedRecord - iterates physicalOrder() and writes
each projected slot at its canonical position so the
downstream ObjectInspector finds fields where it expects them.
The compacted path is the primary fix; the canonical-path bounds
guard is a defensive fallback.
Tests: TestHoodieColumnProjectionUtils covers mask construction;
TestHoodieArrayWritableSchemaUtils covers the AIOBE reproducer,
compacted round-trip, and a canonical-shape regression.
HoodieSchemaTestUtils gains createPlainBlobRecord and
createPlainVariantRecord helpers (variant helper for upcoming
VARIANT parity).
---
.../hudi/common/schema/HoodieProjectionMask.java | 199 +++++++++++++++++++++
.../hudi/common/schema/HoodieSchemaTestUtils.java | 4 +-
.../hudi/hadoop/HiveHoodieReaderContext.java | 12 +-
.../hudi/hadoop/HoodieColumnProjectionUtils.java | 99 ++++++++++
.../utils/HoodieArrayWritableSchemaUtils.java | 179 ++++++++++++------
.../hadoop/TestHoodieColumnProjectionUtils.java | 92 ++++++++++
.../utils/TestHoodieArrayWritableSchemaUtils.java | 108 ++++++++++-
7 files changed, 630 insertions(+), 63 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieProjectionMask.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieProjectionMask.java
new file mode 100644
index 000000000000..0f0a8de07816
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieProjectionMask.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+
+/**
+ * Describes the physical field layout of an incoming columnar record when an
upstream
+ * reader (e.g. Hive's Parquet reader) has compacted nested struct projection
— that is,
+ * dropped non-selected sub-fields and shifted the survivors into low slots.
+ *
+ * <p>The canonical (Hudi-side) schema declares the full ordered field list.
When the
+ * upstream reader projects only a subset of a struct, the resulting array's
slots no
+ * longer correspond to canonical positions. This mask captures, per record
level, the
+ * physical layout, so position-based access can be remapped.
+ *
+ * <p>A mask carries two pieces of information for the level it describes:
+ * <ol>
+ * <li>Whether THIS level is canonical-shaped or compacted. Hive's Parquet
reader pads
+ * non-projected top-level columns with nulls (canonical), but compacts
non-projected
+ * sub-fields of struct columns (compacted).
+ * <li>Optional per-child masks. A canonical level can still descend into a
child whose
+ * sub-record is compacted — for example, the top-level row is canonical
but the
+ * blob_data sub-record's interior is compacted by Hive's
nested-projection pushdown.
+ * </ol>
+ *
+ * <p>{@link #all()} is the no-op identity used everywhere outside the
projected-record
+ * path. {@link #canonicalWith(Map)} preserves canonical positions at this
level while
+ * supplying compacted descent for specific children.
+ */
+public final class HoodieProjectionMask {
+
+ private static final HoodieProjectionMask ALL = new
HoodieProjectionMask(false, Collections.emptyMap(), Collections.emptyMap());
+
+ // True when THIS level's ArrayWritable is compacted to only the projected
fields.
+ // False when this level is canonical-shaped (full positions per schema,
with nulls for unprojected).
+ private final boolean compactedAtThisLevel;
+ // Field name -> position in the projected ArrayWritable. Only meaningful
when compactedAtThisLevel.
+ private final Map<String, Integer> physicalIndex;
+ // Per-child mask used during descent. May be non-empty even when this level
is canonical.
+ private final Map<String, HoodieProjectionMask> childMasks;
+
+ private HoodieProjectionMask(boolean compactedAtThisLevel, Map<String,
Integer> physicalIndex, Map<String, HoodieProjectionMask> childMasks) {
+ this.compactedAtThisLevel = compactedAtThisLevel;
+ this.physicalIndex = physicalIndex;
+ this.childMasks = childMasks;
+ }
+
+ public static HoodieProjectionMask all() {
+ return ALL;
+ }
+
+ /**
+ * Mask whose THIS level is canonical-shaped but whose listed children carry
their own
+ * (typically compacted) descent masks. Use this at boundaries where the
outer record
+ * is full but specific sub-records have been compacted by the reader (the
typical
+ * Hive nested-projection-on-BLOB shape).
+ */
+ public static HoodieProjectionMask canonicalWith(Map<String,
HoodieProjectionMask> childMasks) {
+ if (childMasks == null || childMasks.isEmpty()) {
+ return ALL;
+ }
+ return new HoodieProjectionMask(false, Collections.emptyMap(),
Collections.unmodifiableMap(new LinkedHashMap<>(childMasks)));
+ }
+
+ /**
+ * True when no remapping or descent override applies — the rewrite can use
canonical
+ * positions everywhere below this point.
+ */
+ public boolean isAll() {
+ return !compactedAtThisLevel && childMasks.isEmpty();
+ }
+
+ /**
+ * True when the level this mask describes is canonical-shaped (positions
match the
+ * schema). Children may still carry compacted sub-masks.
+ */
+ public boolean isCanonicalAtThisLevel() {
+ return !compactedAtThisLevel;
+ }
+
+ /**
+ * Position of {@code fieldName} in the compacted ArrayWritable at this
level. Empty
+ * when this level is canonical or the field was not projected.
+ */
+ public OptionalInt physicalIndexOf(String fieldName) {
+ if (!compactedAtThisLevel) {
+ return OptionalInt.empty();
+ }
+ Integer idx = physicalIndex.get(fieldName);
+ return idx == null ? OptionalInt.empty() : OptionalInt.of(idx);
+ }
+
+ /**
+ * Field names of this compacted level, in physical-position order. Empty
when the
+ * level is canonical.
+ */
+ public List<String> physicalOrder() {
+ if (!compactedAtThisLevel) {
+ return Collections.emptyList();
+ }
+ return new ArrayList<>(physicalIndex.keySet());
+ }
+
+ /**
+ * Mask to use when recursing into {@code fieldName}'s sub-record. Falls
back to
+ * {@link #all()} when no override is registered for this child.
+ */
+ public HoodieProjectionMask childOrAll(String fieldName) {
+ HoodieProjectionMask child = childMasks.get(fieldName);
+ return child == null ? ALL : child;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builds a mask describing a compacted level. Insertion order defines the
physical
+ * field order in the projected ArrayWritable.
+ */
+ public static final class Builder {
+ private final LinkedHashMap<String, HoodieProjectionMask> children = new
LinkedHashMap<>();
+
+ public Builder field(String name) {
+ return field(name, ALL);
+ }
+
+ public Builder field(String name, HoodieProjectionMask child) {
+ children.put(name, Objects.requireNonNull(child, "child mask"));
+ return this;
+ }
+
+ public HoodieProjectionMask build() {
+ if (children.isEmpty()) {
+ return ALL;
+ }
+ LinkedHashMap<String, HoodieProjectionMask> childrenCopy = new
LinkedHashMap<>(children);
+ LinkedHashMap<String, Integer> index = new
LinkedHashMap<>(children.size());
+ int i = 0;
+ for (String name : childrenCopy.keySet()) {
+ index.put(name, i++);
+ }
+ return new HoodieProjectionMask(
+ true,
+ Collections.unmodifiableMap(index),
+ Collections.unmodifiableMap(childrenCopy));
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof HoodieProjectionMask)) {
+ return false;
+ }
+ HoodieProjectionMask other = (HoodieProjectionMask) o;
+ return compactedAtThisLevel == other.compactedAtThisLevel
+ && physicalIndex.equals(other.physicalIndex)
+ && childMasks.equals(other.childMasks);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(compactedAtThisLevel, physicalIndex, childMasks);
+ }
+
+ @Override
+ public String toString() {
+ if (isAll()) {
+ return "HoodieProjectionMask{ALL}";
+ }
+ return "HoodieProjectionMask{compacted=" + compactedAtThisLevel +
",children=" + childMasks + "}";
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
index faba63d66690..6052639d209e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
@@ -75,7 +75,7 @@ public class HoodieSchemaTestUtils {
* was stripped by Spark's TableOutputResolver Cast.
*/
public static HoodieSchema createPlainBlobRecord(String recordName) {
- HoodieSchema reference = HoodieSchema.createRecord("reference", null,
null, false, Arrays.asList(
+ HoodieSchema referenceSchema = HoodieSchema.createRecord("reference",
null, null, false, Arrays.asList(
HoodieSchemaField.of("external_path",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
HoodieSchemaField.of("offset",
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)), null,
null),
HoodieSchemaField.of("length",
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)), null,
null),
@@ -83,7 +83,7 @@ public class HoodieSchemaTestUtils {
return HoodieSchema.createRecord(recordName, null, null, false,
Arrays.asList(
HoodieSchemaField.of("type",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
HoodieSchemaField.of("data",
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.BYTES)), null,
JsonProperties.NULL_VALUE),
- HoodieSchemaField.of("reference",
HoodieSchema.createNullable(reference), null, JsonProperties.NULL_VALUE)));
+ HoodieSchemaField.of("reference",
HoodieSchema.createNullable(referenceSchema), null,
JsonProperties.NULL_VALUE)));
}
/**
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index 9e7994f0147b..d9e99742674d 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.schema.HoodieProjectionMask;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCompatibility;
import org.apache.hudi.common.schema.HoodieSchemaField;
@@ -36,6 +37,7 @@ import
org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieAvroSchemaException;
import org.apache.hudi.hadoop.utils.HiveTypeUtils;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableSchemaUtils;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
@@ -174,11 +176,15 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
firstRecordReader = recordReader;
}
ClosableIterator<ArrayWritable> recordIterator = new
RecordReaderValueIterator<>(recordReader);
- if
(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(modifiedDataSchema,
requiredSchema)) {
+ HoodieProjectionMask physicalMask =
HoodieColumnProjectionUtils.buildNestedProjectionMask(jobConfCopy,
modifiedDataSchema);
+ if (physicalMask.isAll() &&
HoodieSchemaCompatibility.areSchemasProjectionEquivalent(modifiedDataSchema,
requiredSchema)) {
return recordIterator;
}
- // record reader puts the required columns in the positions of the data
schema and nulls the rest of the columns
- return new CloseableMappingIterator<>(recordIterator,
recordContext.projectRecord(modifiedDataSchema, requiredSchema));
+ // record reader puts the required columns in the positions of the data
schema and nulls the rest of the columns;
+ // physicalMask additionally tells the rewrite where struct sub-fields
landed when Hive's
+ // Parquet reader compacted nested-column projection.
+ return new CloseableMappingIterator<>(recordIterator,
+ record ->
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(record,
modifiedDataSchema, requiredSchema, Collections.emptyMap(), physicalMask));
}
@Override
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java
index 8971220d1330..c89d37f1c306 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java
@@ -18,6 +18,10 @@
package org.apache.hudi.hadoop;
+import org.apache.hudi.common.schema.HoodieProjectionMask;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -37,7 +41,10 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -59,6 +66,13 @@ public class HoodieColumnProjectionUtils {
* the column a's path is c.a and b's path is c.b
*/
public static final String READ_COLUMN_NAMES_CONF_STR =
"hive.io.file.readcolumn.names";
+ /**
+ * Hive's Parquet reader (DataWritableReadSupport) reads this conf to
compact nested
+ * struct projection. ProjectionPusher in Hive's FetchOperator sets it
before invoking
+ * the record reader; Hudi only needs to consume it. Format: comma-separated
dotted
+ * paths from root to leaf, e.g. {@code
blob_data.reference,blob_data.reference.external_path}.
+ */
+ public static final String READ_NESTED_COLUMN_PATH_CONF_STR =
"hive.io.file.readNestedColumn.paths";
private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = "";
private static final String READ_COLUMN_NAMES_CONF_STR_DEFAULT = "";
@@ -165,4 +179,89 @@ public class HoodieColumnProjectionUtils {
return false;
}
}
+
+ /**
+ * Build a {@link HoodieProjectionMask} reflecting Hive's nested-column
projection for
+ * {@code dataSchema}. Hive's Parquet reader pads non-projected top-level
columns with
+ * nulls (canonical layout) but compacts non-projected sub-fields of struct
columns.
+ * The returned mask preserves canonical positions at the row level and
supplies a
+ * compacted descent mask for each top-level column whose interior was
pruned.
+ *
+ * <p>Returns {@link HoodieProjectionMask#all()} when the conf is empty or
no struct
+ * column has a sub-field projection.
+ */
+ public static HoodieProjectionMask buildNestedProjectionMask(Configuration
conf, HoodieSchema dataSchema) {
+ String paths = conf.get(READ_NESTED_COLUMN_PATH_CONF_STR, "");
+ if (paths.isEmpty()) {
+ return HoodieProjectionMask.all();
+ }
+ // Group nested paths by their top-level column. Top-level-only paths
(single
+ // component, e.g. "blob_data") are ignored here — Hive does not compact
at the
+ // top level, so canonical positions still apply.
+ Map<String, List<List<String>>> pathsByField = new LinkedHashMap<>();
+ for (String path : paths.split(",")) {
+ String trimmed = path.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+ String[] components = trimmed.split("\\.");
+ if (components.length < 2) {
+ continue;
+ }
+ List<String> tail = new ArrayList<>(components.length - 1);
+ for (int i = 1; i < components.length; i++) {
+ tail.add(components[i].toLowerCase(Locale.ROOT));
+ }
+ String head = components[0].toLowerCase(Locale.ROOT);
+ pathsByField.computeIfAbsent(head, k -> new ArrayList<>()).add(tail);
+ }
+ if (pathsByField.isEmpty()) {
+ return HoodieProjectionMask.all();
+ }
+ HoodieSchema rowSchema = dataSchema.getNonNullType();
+ Map<String, HoodieProjectionMask> childMasks = new LinkedHashMap<>();
+ for (HoodieSchemaField topField : rowSchema.getFields()) {
+ String key = topField.name().toLowerCase(Locale.ROOT);
+ List<List<String>> nestedPaths = pathsByField.get(key);
+ if (nestedPaths == null) {
+ continue;
+ }
+ HoodieProjectionMask childMask = buildMaskForRecord(topField.schema(),
nestedPaths);
+ if (!childMask.isAll()) {
+ childMasks.put(topField.name(), childMask);
+ }
+ }
+ return HoodieProjectionMask.canonicalWith(childMasks);
+ }
+
+ private static HoodieProjectionMask buildMaskForRecord(HoodieSchema schema,
List<List<String>> paths) {
+ HoodieSchema recordSchema = schema.getNonNullType();
+ HoodieSchemaType type = recordSchema.getType();
+ if (type != HoodieSchemaType.RECORD && type != HoodieSchemaType.BLOB &&
type != HoodieSchemaType.VARIANT) {
+ return HoodieProjectionMask.all();
+ }
+ // Group remaining components by first-level field name; lower-case to
match
+ // Hive's lowercased column names.
+ Map<String, List<List<String>>> pathsByField = new LinkedHashMap<>();
+ for (List<String> path : paths) {
+ if (path.isEmpty()) {
+ // Whole sub-record projected as-is — no compaction below this point.
+ return HoodieProjectionMask.all();
+ }
+ String head = path.get(0);
+ List<String> tail = path.subList(1, path.size());
+ pathsByField.computeIfAbsent(head, k -> new ArrayList<>()).add(tail);
+ }
+ HoodieProjectionMask.Builder builder = HoodieProjectionMask.builder();
+ for (HoodieSchemaField field : recordSchema.getFields()) {
+ String key = field.name().toLowerCase(Locale.ROOT);
+ List<List<String>> childPaths = pathsByField.get(key);
+ if (childPaths == null) {
+ continue;
+ }
+ HoodieProjectionMask childMask = buildMaskForRecord(field.schema(),
childPaths);
+ builder.field(field.name(), childMask);
+ }
+ return builder.build();
+ }
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
index a468de4f52a5..365f7a44cc3a 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
@@ -20,6 +20,7 @@
package org.apache.hudi.hadoop.utils;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieProjectionMask;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCompatibility;
import org.apache.hudi.common.schema.HoodieSchemaField;
@@ -63,22 +64,40 @@ import static
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
public class HoodieArrayWritableSchemaUtils {
public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable
writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String>
renameCols) {
- return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema,
newSchema, renameCols, new LinkedList<>());
+ return rewriteRecordWithNewSchema(writable, oldSchema, newSchema,
renameCols, HoodieProjectionMask.all());
}
- private static Writable rewriteRecordWithNewSchema(Writable writable,
HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String> renameCols,
Deque<String> fieldNames) {
+ /**
+ * Rewrite variant aware of an upstream reader having compacted nested
struct projection.
+ *
+ * <p>{@code physicalMask} describes the actual ArrayWritable layout per
record level:
+ * which sub-fields are present and at what physical index. Pass {@link
HoodieProjectionMask#all()}
+ * when the input is canonical-shaped — the rewrite then reduces to
position-by-canonical-pos
+ * access exactly like the legacy 4-arg overload.
+ */
+ public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable
writable, HoodieSchema oldSchema, HoodieSchema newSchema,
+ Map<String, String>
renameCols, HoodieProjectionMask physicalMask) {
+ return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema,
newSchema, renameCols, new LinkedList<>(), physicalMask);
+ }
+
+ private static Writable rewriteRecordWithNewSchema(Writable writable,
HoodieSchema oldSchema, HoodieSchema newSchema,
+ Map<String, String>
renameCols, Deque<String> fieldNames, HoodieProjectionMask physicalMask) {
if (writable == null) {
return null;
}
HoodieSchema oldSchemaNonNull = oldSchema.getNonNullType();
HoodieSchema newSchemaNonNull = newSchema.getNonNullType();
- if
(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(oldSchemaNonNull,
newSchemaNonNull)) {
+ // Only short-circuit when the input is in canonical shape; an
upstream-projected
+ // ArrayWritable needs the recursive rewrite to remap slots, even when the
schema
+ // pair would otherwise be equivalent.
+ if (physicalMask.isAll() &&
HoodieSchemaCompatibility.areSchemasProjectionEquivalent(oldSchemaNonNull,
newSchemaNonNull)) {
return writable;
}
- return rewriteRecordWithNewSchemaInternal(writable, oldSchemaNonNull,
newSchemaNonNull, renameCols, fieldNames);
+ return rewriteRecordWithNewSchemaInternal(writable, oldSchemaNonNull,
newSchemaNonNull, renameCols, fieldNames, physicalMask);
}
- private static Writable rewriteRecordWithNewSchemaInternal(Writable
writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String>
renameCols, Deque<String> fieldNames) {
+ private static Writable rewriteRecordWithNewSchemaInternal(Writable
writable, HoodieSchema oldSchema, HoodieSchema newSchema,
+ Map<String,
String> renameCols, Deque<String> fieldNames, HoodieProjectionMask
physicalMask) {
switch (newSchema.getType()) {
// BLOB/VARIANT are physically records; share the RECORD field-by-name
rewrite.
case BLOB:
@@ -87,55 +106,10 @@ public class HoodieArrayWritableSchemaUtils {
if (!(writable instanceof ArrayWritable)) {
throw new SchemaCompatibilityException(String.format("Cannot rewrite
%s as a record", writable.getClass().getName()));
}
-
- ArrayWritable arrayWritable = (ArrayWritable) writable;
- List<HoodieSchemaField> fields = newSchema.getFields();
- // projection will keep the size from the "from" schema because it
gets recycled
- // and if the size changes the reader will fail
- boolean noFieldsRenaming = renameCols.isEmpty();
- String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames);
- Writable[] values = new Writable[Math.max(fields.size(),
arrayWritable.get().length)];
- for (int i = 0; i < fields.size(); i++) {
- HoodieSchemaField newField = newSchema.getFields().get(i);
- String newFieldName = newField.name();
- fieldNames.push(newFieldName);
- Option<HoodieSchemaField> oldFieldOpt = noFieldsRenaming
- ? oldSchema.getField(newFieldName)
- : oldSchema.getField(getOldFieldNameWithRenaming(namePrefix,
newFieldName, renameCols));
- if (oldFieldOpt.isPresent()) {
- HoodieSchemaField oldField = oldFieldOpt.get();
- values[i] =
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()],
oldField.schema(), newField.schema(), renameCols, fieldNames);
- } else if (newField.defaultVal().isPresent() &&
newField.defaultVal().get().equals(HoodieSchema.NULL_VALUE)) {
- values[i] = NullWritable.get();
- } else if (!newField.schema().isNullable() &&
newField.defaultVal().isEmpty()) {
- throw new SchemaCompatibilityException("Field " +
createFullName(fieldNames) + " has no default value and is non-nullable");
- } else if (newField.defaultVal().isPresent()) {
- switch (newField.getNonNullSchema().getType()) {
- case BOOLEAN:
- values[i] = new BooleanWritable((Boolean)
newField.defaultVal().get());
- break;
- case INT:
- values[i] = new IntWritable((Integer)
newField.defaultVal().get());
- break;
- case LONG:
- values[i] = new LongWritable((Long)
newField.defaultVal().get());
- break;
- case FLOAT:
- values[i] = new FloatWritable((Float)
newField.defaultVal().get());
- break;
- case DOUBLE:
- values[i] = new DoubleWritable((Double)
newField.defaultVal().get());
- break;
- case STRING:
- values[i] = new Text(newField.defaultVal().toString());
- break;
- default:
- throw new SchemaCompatibilityException("Field " +
createFullName(fieldNames) + " has no default value");
- }
- }
- fieldNames.pop();
+ if (physicalMask.isCanonicalAtThisLevel()) {
+ return rewriteCanonicalRecord((ArrayWritable) writable, oldSchema,
newSchema, renameCols, fieldNames, physicalMask);
}
- return new ArrayWritable(Writable.class, values);
+ return rewriteCompactedRecord((ArrayWritable) writable, oldSchema,
newSchema, renameCols, fieldNames, physicalMask);
case ENUM:
if ((writable instanceof BytesWritable)) {
@@ -155,7 +129,7 @@ public class HoodieArrayWritableSchemaUtils {
ArrayWritable array = (ArrayWritable) writable;
fieldNames.push("element");
for (int i = 0; i < array.get().length; i++) {
- array.get()[i] = rewriteRecordWithNewSchema(array.get()[i],
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames);
+ array.get()[i] = rewriteRecordWithNewSchema(array.get()[i],
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames,
HoodieProjectionMask.all());
}
fieldNames.pop();
return array;
@@ -167,7 +141,8 @@ public class HoodieArrayWritableSchemaUtils {
fieldNames.push("value");
for (int i = 0; i < map.get().length; i++) {
Writable mapEntry = map.get()[i];
- ((ArrayWritable) mapEntry).get()[1] =
rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1],
oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames);
+ ((ArrayWritable) mapEntry).get()[1] =
+ rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1],
oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames,
HoodieProjectionMask.all());
}
return map;
@@ -179,6 +154,100 @@ public class HoodieArrayWritableSchemaUtils {
}
}
+ /**
+ * Canonical record rewrite: input ArrayWritable matches {@code oldSchema}'s
field
+ * positions (with possibly trailing nulls); output is sized to
+ * {@code max(newSchema field count, input array length)} so any trailing
slots Hive
+ * left in a recycled row buffer are preserved, while schema-evolution
semantics still
+ * apply to the leading {@code newSchema}-sized region (default values, null
padding,
+ * non-nullable-without-default throws).
+ */
+ private static Writable rewriteCanonicalRecord(ArrayWritable arrayWritable,
HoodieSchema oldSchema, HoodieSchema newSchema,
+ Map<String, String>
renameCols, Deque<String> fieldNames, HoodieProjectionMask physicalMask) {
+ List<HoodieSchemaField> fields = newSchema.getFields();
+ boolean noFieldsRenaming = renameCols.isEmpty();
+ String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames);
+ Writable[] values = new Writable[Math.max(fields.size(),
arrayWritable.get().length)];
+ for (int i = 0; i < fields.size(); i++) {
+ HoodieSchemaField newField = fields.get(i);
+ String newFieldName = newField.name();
+ fieldNames.push(newFieldName);
+ Option<HoodieSchemaField> oldFieldOpt = noFieldsRenaming
+ ? oldSchema.getField(newFieldName)
+ : oldSchema.getField(getOldFieldNameWithRenaming(namePrefix,
newFieldName, renameCols));
+
+ // Bounds-check because some upstream readers may still hand back arrays
shorter
+ // than the schema declares (no projection mask supplied).
+ if (oldFieldOpt.isPresent() && oldFieldOpt.get().pos() <
arrayWritable.get().length) {
+ HoodieSchemaField oldField = oldFieldOpt.get();
+ HoodieProjectionMask childMask =
physicalMask.childOrAll(oldField.name());
+ values[i] =
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()],
oldField.schema(), newField.schema(), renameCols, fieldNames, childMask);
+ } else if (newField.defaultVal().isPresent() &&
newField.defaultVal().get().equals(HoodieSchema.NULL_VALUE)) {
+ values[i] = NullWritable.get();
+ } else if (!newField.schema().isNullable() &&
newField.defaultVal().isEmpty()) {
+ throw new SchemaCompatibilityException("Field " +
createFullName(fieldNames) + " has no default value and is non-nullable");
+ } else if (newField.defaultVal().isPresent()) {
+ switch (newField.getNonNullSchema().getType()) {
+ case BOOLEAN:
+ values[i] = new BooleanWritable((Boolean)
newField.defaultVal().get());
+ break;
+ case INT:
+ values[i] = new IntWritable((Integer) newField.defaultVal().get());
+ break;
+ case LONG:
+ values[i] = new LongWritable((Long) newField.defaultVal().get());
+ break;
+ case FLOAT:
+ values[i] = new FloatWritable((Float) newField.defaultVal().get());
+ break;
+ case DOUBLE:
+ values[i] = new DoubleWritable((Double)
newField.defaultVal().get());
+ break;
+ case STRING:
+ values[i] = new Text(newField.defaultVal().toString());
+ break;
+ default:
+ throw new SchemaCompatibilityException("Field " +
createFullName(fieldNames) + " has no default value");
+ }
+ }
+ fieldNames.pop();
+ }
+ return new ArrayWritable(Writable.class, values);
+ }
+
+ /**
+ * Compacted record rewrite: input ArrayWritable matches the upstream
reader's
+ * projected schema (only the projected sub-fields, in declared order). The
output
+ * preserves the same physical layout — Hive's downstream {@code
ArrayWritableObjectInspector}
+ * was constructed from the projected schema and expects compacted positions.
+ * Per-element schema conversion still applies (e.g. plain STRING →
canonical ENUM).
+ */
+ private static Writable rewriteCompactedRecord(ArrayWritable arrayWritable,
HoodieSchema oldSchema, HoodieSchema newSchema,
+ Map<String, String>
renameCols, Deque<String> fieldNames, HoodieProjectionMask physicalMask) {
+ Writable[] inputs = arrayWritable.get();
+ Writable[] values = new Writable[inputs.length];
+ List<String> order = physicalMask.physicalOrder();
+ for (int physIdx = 0; physIdx < order.size(); physIdx++) {
+ if (physIdx >= inputs.length) {
+ // Mask claims a field at a position the reader didn't fill; defensive
— leave null.
+ continue;
+ }
+ String physicalFieldName = order.get(physIdx);
+ Option<HoodieSchemaField> oldFieldOpt =
oldSchema.getField(physicalFieldName);
+ Option<HoodieSchemaField> newFieldOpt =
newSchema.getField(physicalFieldName);
+ if (oldFieldOpt.isEmpty() || newFieldOpt.isEmpty()) {
+ // The reader returned a sub-field neither side knows about — pass
through.
+ values[physIdx] = inputs[physIdx];
+ continue;
+ }
+ fieldNames.push(physicalFieldName);
+ HoodieProjectionMask childMask =
physicalMask.childOrAll(physicalFieldName);
+ values[physIdx] = rewriteRecordWithNewSchema(inputs[physIdx],
oldFieldOpt.get().schema(), newFieldOpt.get().schema(), renameCols, fieldNames,
childMask);
+ fieldNames.pop();
+ }
+ return new ArrayWritable(Writable.class, values);
+ }
+
public static Writable rewritePrimaryType(Writable writable, HoodieSchema
oldSchema, HoodieSchema newSchema) {
if (oldSchema.getType() == newSchema.getType()) {
switch (oldSchema.getType()) {
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java
index e7467acef5d0..fcf5ac44efba 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java
@@ -18,10 +18,21 @@
package org.apache.hudi.hadoop;
+import org.apache.hudi.common.schema.HoodieProjectionMask;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaTestUtils;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import java.util.OptionalInt;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -73,4 +84,85 @@ public class TestHoodieColumnProjectionUtils {
TypeInfo typeInfo11 =
TypeInfoUtils.getTypeInfosFromTypeString(col11).get(0);
assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo11));
}
+
+ @Test
+ void testBuildNestedProjectionMaskEmptyConfYieldsAll() {
+ HoodieSchema row = rowSchemaWithBlobColumn();
+ Configuration conf = new Configuration();
+ HoodieProjectionMask mask =
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+ assertTrue(mask.isAll());
+ }
+
+ @Test
+ void testBuildNestedProjectionMaskTopLevelOnlyYieldsAll() {
+ // Top-level pruning never compacts; only nested paths produce a mask.
+ HoodieSchema row = rowSchemaWithBlobColumn();
+ Configuration conf = new Configuration();
+ conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR,
"blob_data,id");
+ HoodieProjectionMask mask =
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+ assertTrue(mask.isAll());
+ }
+
+ @Test
+ void testBuildNestedProjectionMaskForBlobReferenceProjection() {
+ HoodieSchema row = rowSchemaWithBlobColumn();
+ Configuration conf = new Configuration();
+ conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR,
"blob_data.reference");
+ HoodieProjectionMask mask =
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+
+ assertFalse(mask.isAll());
+ assertTrue(mask.isCanonicalAtThisLevel());
+ HoodieProjectionMask blobChild = mask.childOrAll("blob_data");
+ assertFalse(blobChild.isAll());
+ assertFalse(blobChild.isCanonicalAtThisLevel());
+ assertEquals(OptionalInt.of(0), blobChild.physicalIndexOf("reference"));
+ assertEquals(OptionalInt.empty(), blobChild.physicalIndexOf("type"));
+ assertEquals(OptionalInt.empty(), blobChild.physicalIndexOf("data"));
+ assertTrue(blobChild.childOrAll("reference").isAll());
+ }
+
+ @Test
+ void testBuildNestedProjectionMaskForExternalPathProjection() {
+ HoodieSchema row = rowSchemaWithBlobColumn();
+ Configuration conf = new Configuration();
+ conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR,
"blob_data.reference.external_path");
+ HoodieProjectionMask mask =
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+
+ HoodieProjectionMask blobChild = mask.childOrAll("blob_data");
+ assertEquals(OptionalInt.of(0), blobChild.physicalIndexOf("reference"));
+ HoodieProjectionMask refChild = blobChild.childOrAll("reference");
+ assertFalse(refChild.isAll());
+ assertEquals(OptionalInt.of(0), refChild.physicalIndexOf("external_path"));
+ assertEquals(OptionalInt.empty(), refChild.physicalIndexOf("offset"));
+ }
+
+ @Test
+ void testBuildNestedProjectionMaskOrdersByDeclaredSchema() {
+ // Physical order follows the schema, not the conf-path order.
+ HoodieSchema row = rowSchemaWithBlobColumn();
+ Configuration conf = new Configuration();
+ conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR,
"blob_data.reference,blob_data.type");
+ HoodieProjectionMask mask =
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+
+ HoodieProjectionMask blobChild = mask.childOrAll("blob_data");
+ assertEquals(OptionalInt.of(0), blobChild.physicalIndexOf("type"));
+ assertEquals(OptionalInt.of(1), blobChild.physicalIndexOf("reference"));
+ }
+
+ @Test
+ void testBuildNestedProjectionMaskCaseInsensitive() {
+ HoodieSchema row = rowSchemaWithBlobColumn();
+ Configuration conf = new Configuration();
+ conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR,
"BLOB_DATA.Reference.External_Path");
+ HoodieProjectionMask mask =
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+
+ assertFalse(mask.childOrAll("blob_data").isAll());
+ assertEquals(OptionalInt.of(0),
mask.childOrAll("blob_data").physicalIndexOf("reference"));
+ }
+
+ private static HoodieSchema rowSchemaWithBlobColumn() {
+ HoodieSchemaField idField = HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.LONG), null, null);
+ HoodieSchemaField blobField = HoodieSchemaField.of("blob_data",
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data"), null, null);
+ return HoodieSchema.createRecord("test_row", null, null, false,
Arrays.asList(idField, blobField));
+ }
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
index 16188b520074..ff5bdbc18694 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.utils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.schema.HoodieProjectionMask;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaTestUtils;
import org.apache.hudi.common.schema.HoodieSchemaType;
@@ -334,6 +335,107 @@ public class TestHoodieArrayWritableSchemaUtils {
assertSame(bytes, rewritten);
}
+ @Test
+ void testRewriteBlobWithPrunedArrayWritableFillsMissingFieldsWithNull() {
+ HoodieSchema oldSchema =
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data");
+ HoodieSchema newSchema = HoodieSchema.createBlob();
+ ArrayWritable prunedRecord = new ArrayWritable(Writable.class, new
Writable[] {
+ new Text("INLINE")
+ });
+
+ ArrayWritable rewritten =
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+ prunedRecord, oldSchema, newSchema, Collections.emptyMap());
+
+ assertEquals(3, rewritten.get().length);
+ assertInstanceOf(BytesWritable.class, rewritten.get()[0]);
+ assertEquals("INLINE", new String(((BytesWritable)
rewritten.get()[0]).copyBytes()));
+ assertEquals(NullWritable.get(), rewritten.get()[1]);
+ assertEquals(NullWritable.get(), rewritten.get()[2]);
+ }
+
+ @Test
+ void testRewriteBlobWithPrunedReferenceProjection() {
+ // SELECT blob_data.reference: compacted-shape input must round-trip
unchanged so
+ // Hive's projected-schema ObjectInspector finds reference at slot 0.
+ HoodieSchema oldSchema =
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data");
+ HoodieSchema newSchema = HoodieSchema.createBlob();
+ ArrayWritable referenceSubstruct = new ArrayWritable(Writable.class, new
Writable[] {
+ new Text("blobs/updated-1"),
+ new LongWritable(0L),
+ new LongWritable(11L),
+ new BooleanWritable(false)
+ });
+ ArrayWritable prunedRecord = new ArrayWritable(Writable.class, new
Writable[] {
+ referenceSubstruct
+ });
+ HoodieProjectionMask mask =
HoodieProjectionMask.builder().field("reference").build();
+
+ ArrayWritable rewritten =
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+ prunedRecord, oldSchema, newSchema, Collections.emptyMap(), mask);
+
+ assertEquals(1, rewritten.get().length);
+ ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[0];
+ assertEquals(4, rewrittenRef.get().length);
+ assertEquals(new Text("blobs/updated-1"), rewrittenRef.get()[0]);
+ assertEquals(new LongWritable(0L), rewrittenRef.get()[1]);
+ assertEquals(new LongWritable(11L), rewrittenRef.get()[2]);
+ assertEquals(new BooleanWritable(false), rewrittenRef.get()[3]);
+ }
+
+ @Test
+ void testRewriteBlobWithPrunedReferenceExternalPathProjection() {
+ // SELECT blob_data.reference.external_path — reproducer for the CCE at
:149.
+ HoodieSchema oldSchema =
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data");
+ HoodieSchema newSchema = HoodieSchema.createBlob();
+ ArrayWritable prunedReference = new ArrayWritable(Writable.class, new
Writable[] {
+ new Text("blobs/updated-1")
+ });
+ ArrayWritable prunedRecord = new ArrayWritable(Writable.class, new
Writable[] {
+ prunedReference
+ });
+ HoodieProjectionMask mask = HoodieProjectionMask.builder()
+ .field("reference",
HoodieProjectionMask.builder().field("external_path").build())
+ .build();
+
+ ArrayWritable rewritten =
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+ prunedRecord, oldSchema, newSchema, Collections.emptyMap(), mask);
+
+ assertEquals(1, rewritten.get().length);
+ ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[0];
+ assertEquals(1, rewrittenRef.get().length);
+ assertEquals(new Text("blobs/updated-1"), rewrittenRef.get()[0]);
+ }
+
+ @Test
+ void testRewriteBlobWithCanonicalShapeStillWorksAfterMaskWiring() {
+ // Regression guard: mask=all() must keep the legacy canonical-shape
behavior.
+ HoodieSchema oldSchema =
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data");
+ HoodieSchema newSchema = HoodieSchema.createBlob();
+ ArrayWritable referenceSubstruct = new ArrayWritable(Writable.class, new
Writable[] {
+ new Text("blobs/path-1"),
+ new LongWritable(0L),
+ new LongWritable(11L),
+ new BooleanWritable(false)
+ });
+ ArrayWritable record = new ArrayWritable(Writable.class, new Writable[] {
+ new Text("OUT_OF_LINE"),
+ NullWritable.get(),
+ referenceSubstruct
+ });
+
+ ArrayWritable rewritten =
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+ record, oldSchema, newSchema, Collections.emptyMap(),
HoodieProjectionMask.all());
+
+ assertInstanceOf(BytesWritable.class, rewritten.get()[0]);
+ assertEquals("OUT_OF_LINE", new String(((BytesWritable)
rewritten.get()[0]).copyBytes()));
+ assertEquals(NullWritable.get(), rewritten.get()[1]);
+ ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[2];
+ assertEquals(new Text("blobs/path-1"), rewrittenRef.get()[0]);
+ assertEquals(new LongWritable(0L), rewrittenRef.get()[1]);
+ assertEquals(new LongWritable(11L), rewrittenRef.get()[2]);
+ assertEquals(new BooleanWritable(false), rewrittenRef.get()[3]);
+ }
+
private void validateRewriteWithAvro(
Writable oldWritable,
HoodieSchema oldSchema,
@@ -452,9 +554,9 @@ public class TestHoodieArrayWritableSchemaUtils {
void testRewritePlainVariantRecordToCanonicalVariantSchema() {
HoodieSchema oldSchema =
HoodieSchemaTestUtils.createPlainVariantRecord("variant_data");
HoodieSchema newSchema = HoodieSchema.createVariant();
- BytesWritable metadata = new BytesWritable(new byte[]{1, 2, 3});
- BytesWritable value = new BytesWritable(new byte[]{4, 5, 6});
- ArrayWritable record = new ArrayWritable(Writable.class, new
Writable[]{metadata, value});
+ BytesWritable metadata = new BytesWritable(new byte[] {1, 2, 3});
+ BytesWritable value = new BytesWritable(new byte[] {4, 5, 6});
+ ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]
{metadata, value});
ArrayWritable rewritten =
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
record, oldSchema, newSchema, Collections.emptyMap());