voonhous commented on code in PR #18961:
URL: https://github.com/apache/hudi/pull/18961#discussion_r3449398313
##########
hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java:
##########
@@ -398,4 +437,240 @@ public boolean allowNumericScaleChanges() {
return true;
}
}
+
+ /**
+ * Base {@link ShreddingUtils.ShreddedRow} with all accessors throwing;
concrete rows override
+ * only the accessors valid for their nesting context. This is the read-path
mirror of the
+ * write-path {@link AvroShreddedResult}: it reads Avro records to feed
Spark's reconstruction
+ * ({@link ShreddingUtils#rebuild}).
+ */
+ abstract static class BaseAvroShreddedRow implements
ShreddingUtils.ShreddedRow {
+ @Override
+ public boolean isNullAt(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public byte getByte(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public short getShort(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public int getInt(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public long getLong(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public float getFloat(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public double getDouble(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public BigDecimal getDecimal(int ordinal, int precision, int scale) {
+ throw unsupported();
+ }
+
+ @Override
+ public String getString(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public UUID getUuid(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public ShreddingUtils.ShreddedRow getStruct(int ordinal, int numFields) {
+ throw unsupported();
+ }
+
+ @Override
+ public ShreddingUtils.ShreddedRow getArray(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public int numElements() {
+ throw unsupported();
+ }
+
+ private static UnsupportedOperationException unsupported() {
+ return new UnsupportedOperationException("Accessor not valid for this
shredded row context");
+ }
+ }
+
+ /**
+ * A shredded variant struct {@code {value, [metadata], typed_value}}. Maps
the Spark
+ * {@link VariantSchema} ordinals (variantIdx / topLevelMetadataIdx /
typedIdx) back to the named
+ * Avro fields, and reads {@code typed_value} for scalar/object/array
reconstruction.
+ */
+ static final class AvroVariantRow extends BaseAvroShreddedRow {
+ private final GenericRecord record;
+ private final VariantSchema schema;
+
+ AvroVariantRow(GenericRecord record, VariantSchema schema) {
+ this.record = record;
+ this.schema = schema;
+ }
+
+ private String fieldNameFor(int ordinal) {
+ if (ordinal == schema.typedIdx) {
+ return HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD;
+ }
+ if (ordinal == schema.variantIdx) {
+ return HoodieSchema.Variant.VARIANT_VALUE_FIELD;
+ }
+ if (ordinal == schema.topLevelMetadataIdx) {
+ return HoodieSchema.Variant.VARIANT_METADATA_FIELD;
+ }
+ throw new IllegalArgumentException("Unexpected shredded ordinal: " +
ordinal);
+ }
+
+ @Override public boolean isNullAt(int ordinal) {
+ return record.get(fieldNameFor(ordinal)) == null;
+ }
+
+ @Override public byte[] getBinary(int ordinal) {
+ return toByteArray((ByteBuffer) record.get(fieldNameFor(ordinal)));
+ }
+
+ // The scalar getters below read typed_value directly: Spark only invokes
them for the scalar
Review Comment:
Good call. Went with a small \`typedValue()\` helper instead of per-getter
comments: the scalar/struct/array getters now read \`typedValue()\` rather than
\`record.get(VARIANT_TYPED_VALUE_FIELD)\`, so the ignored \`ordinal\` is
obvious at every call site and the rationale lives in one comment on the
helper. (e6024c3)
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/VariantShreddingInferenceInternalRowFileWriter.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.avro.VariantShreddingSchemaInferrer;
+import org.apache.hudi.avro.VariantShreddingSchemaInferrer.VariantSample;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.io.storage.VariantShreddingInferenceFileWriter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieInternalRowFileWriter} decorator that infers a per-file
variant shredding
+ * schema from the first rows before opening the real parquet writer; the
row-writer-path
+ * sibling of {@link VariantShreddingInferenceFileWriter}, sharing its
buffering thresholds and
+ * failure semantics.
+ *
+ * <p>Meta columns including the commit seqno are composed into the row by the
handle BEFORE
+ * {@code writeRow}, so ordered replay is value-exact here by construction.
Rows and keys are
+ * copied because Spark iterators reuse their instances.</p>
+ */
+@Slf4j
+public class VariantShreddingInferenceInternalRowFileWriter implements
HoodieInternalRowFileWriter {
+
+ /** Creates the real row file writer once the inferred typed_value schemas
are known. */
+ @FunctionalInterface
+ public interface InferredRowWriterFactory {
+ HoodieInternalRowFileWriter create(Map<String, HoodieSchema>
inferredTypedValues) throws IOException;
+ }
+
+ private final List<String> variantColumns;
+ private final int[] ordinals;
+ private final VariantShreddingSchemaInferrer inferrer;
+ private final InferredRowWriterFactory writerFactory;
+ private final long maxBufferedBytes;
+ private final DefaultSizeEstimator<InternalRow> sizeEstimator = new
DefaultSizeEstimator<>();
+
+ private static final int SIZE_ESTIMATE_INTERVAL = 100;
+
+ private final List<BufferedRow> buffer = new ArrayList<>();
Review Comment:
Done - moved \`SIZE_ESTIMATE_INTERVAL\` to the top of the class, before the
nested interface and instance fields, matching the sibling
\`VariantShreddingInferenceFileWriter\`. (97c5051)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]