laskoviymishka commented on code in PR #16370: URL: https://github.com/apache/iceberg/pull/16370#discussion_r3284275290
########## data/src/main/java/org/apache/iceberg/data/RecordVariantShreddingAnalyzer.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.parquet.VariantShreddingAnalyzer; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantValue; + +/** + * Generic {@link Record} implementation that extracts variant values from {@link Record#get(int)} + * using positional indices aligned with {@link Schema#columns()}. + */ +class RecordVariantShreddingAnalyzer extends VariantShreddingAnalyzer<Record, Schema> { + + RecordVariantShreddingAnalyzer() {} + + @Override + protected int resolveColumnIndex(Schema engineSchema, String columnName) { + if (engineSchema == null) { + return -1; Review Comment: Returning `-1` on a null `engineSchema` silently disables shredding for the whole column. Once the writer-side fix is in (see my comment in `ParquetFormatModel.java`), this should be unreachable in normal paths, so I'd rather it fail loudly than degrade silently: ```java Preconditions.checkNotNull(engineSchema, "Invalid engine schema: null"); ``` Spark's analyzer does the same — calls `sparkSchema.fieldIndex(name)` with no null handling. ########## parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java: ########## @@ -129,17 +131,22 @@ private WriteBuilderWrapper( EncryptedOutputFile outputFile, WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction, VariantShreddingAnalyzer<D, S> variantAnalyzer, - UnaryOperator<D> copyFunc) { + UnaryOperator<D> copyFunc, + Class<S> schemaType) { this.internal = Parquet.write(outputFile); this.writerFunction = writerFunction; this.variantAnalyzer = variantAnalyzer; this.copyFunc = copyFunc; + this.schemaType = schemaType; } @Override public ModelWriteBuilder<D, S> schema(Schema newSchema) { this.schema = newSchema; internal.schema(newSchema); + if (Schema.class.equals(schemaType)) { Review Comment: Right idea, but I think two things need to happen here for this to actually work end-to-end. First, this auto-set fires every time `schema(...)` is called and silently clobbers a caller-set `engineSchema` if the order happens to be `engineSchema(custom).schema(s)`. I'd gate it so explicit wins: ```java if (this.engineSchema == null && Schema.class.equals(schemaType)) { this.engineSchema = (S) newSchema; } ``` Second — and this is the bigger one — `RegistryBasedFileWriterFactory.newDataWriter()` calls `.schema(s).engineSchema(inputSchema())` back-to-back, and for the KC path `GenericFileWriterFactory.Builder.build()` goes through the deprecated 10-arg constructor and passes `null` for `inputSchema`. So this auto-populate does the right thing, then `.engineSchema(null)` immediately overwrites it because `WriteBuilderWrapper.engineSchema(...)` (just below) has no null guard. By the time `buildShreddedAppender()` runs, `resolveColumnIndex` gets `null`, returns `-1` for every column, and shredding silently doesn't activate for KC writes. A null guard on the `engineSchema(...)` setter — skip the assignment if `newSchema == null` — would make the auto-populated value stick. Also worth a `@SuppressWarnings("unchecked")` on the cast (or pulling it into a small helper) — it currently emits an unchecked-cast warning. I'll ping @AnatolyPopov to take a look too since he's closer to the KC side and can sanity-check the wiring claim. ########## data/src/main/java/org/apache/iceberg/data/RecordVariantShreddingAnalyzer.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.parquet.VariantShreddingAnalyzer; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantValue; + +/** + * Generic {@link Record} implementation that extracts variant values from {@link Record#get(int)} + * using positional indices aligned with {@link Schema#columns()}. + */ +class RecordVariantShreddingAnalyzer extends VariantShreddingAnalyzer<Record, Schema> { + + RecordVariantShreddingAnalyzer() {} + + @Override + protected int resolveColumnIndex(Schema engineSchema, String columnName) { + if (engineSchema == null) { + return -1; + } + + List<NestedField> cols = engineSchema.columns(); + for (int i = 0; i < cols.size(); i++) { Review Comment: Non-blocker, but worth thinking about: this is O(n) per variant column, so O(n·k) for wide schemas (CDC tables with 200+ columns are not uncommon). Spark uses `sparkSchema.fieldIndex(name)` which is O(1). A precomputed `Map<String, Integer>` cached on the instance would match that. Also, while we're here — the class Javadoc says "positional indices aligned with `Schema#columns()`", and `extractVariantValues` then uses `record.get(int)` against that same index. That's a real contract: the records being analyzed must have been built against the same schema passed as `engineSchema`. Worth one sentence in the Javadoc making that explicit so a future caller doesn't pass a projected schema and get silent misalignment. ########## data/src/test/java/org/apache/iceberg/data/TestRecordVariantShreddingAnalyzer.java: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.InternalTestHelpers; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetFileTestUtils; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestRecordVariantShreddingAnalyzer { + + private static final Schema VARIANT_AFTER_ID_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "v", Types.VariantType.get())); + + private static final Schema VARIANT_BEFORE_ID_SCHEMA = + new Schema( + Types.NestedField.optional(1, "v", Types.VariantType.get()), + Types.NestedField.required(2, "id", Types.LongType.get())); + + private Variant variant; + private List<Record> records; + + @TempDir private Path temp; + + @BeforeEach + public void before() { + ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true); + VariantMetadata metadata = Variants.metadata(metadataBuffer); + ByteBuffer objectBuffer = + VariantTestUtil.createObject( + metadataBuffer, + ImmutableMap.of( + "a", Variants.of(42), + "b", Variants.of("hello"))); + variant = Variant.of(metadata, Variants.value(metadata, objectBuffer)); + + GenericRecord record = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + records = + ImmutableList.of( + record.copy(ImmutableMap.of("id", 1L, "v", variant)), + record.copy(ImmutableMap.of("id", 2L, "v", variant)), + record.copy(ImmutableMap.of("id", 3L, "v", variant))); + } + + @Test + public void testAnalyzeVariantColumnsUsesIcebergColumnOrder() { + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(2); + GroupType typedValue = shreddedTypes.get(2).asGroupType(); + assertThat(typedValue.getName()).isEqualTo("typed_value"); + assertThat(typedValue.containsField("a")).isTrue(); + assertThat(typedValue.containsField("b")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsWhenVariantIsFirstColumn() { + GenericRecord record = GenericRecord.create(VARIANT_BEFORE_ID_SCHEMA); + List<Record> variantFirstRecords = + ImmutableList.of( + record.copy(ImmutableMap.of("v", variant, "id", 1L)), + record.copy(ImmutableMap.of("v", variant, "id", 2L))); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + variantFirstRecords, VARIANT_BEFORE_ID_SCHEMA, VARIANT_BEFORE_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(1); + assertThat(shreddedTypes.get(1).asGroupType().containsField("a")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsSkipsNullVariantValues() { + GenericRecord withVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withVariant.setField("id", 1L); + withVariant.setField("v", variant); + + GenericRecord withNullVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withNullVariant.setField("id", 2L); + withNullVariant.setField("v", null); + + GenericRecord withVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withVariant2.setField("id", 3L); + withVariant2.setField("v", variant); + + List<Record> recordsWithNulls = ImmutableList.of(withVariant, withNullVariant, withVariant2); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + recordsWithNulls, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(2); + assertThat(shreddedTypes.get(2).asGroupType().containsField("a")).isTrue(); + assertThat(shreddedTypes.get(2).asGroupType().containsField("b")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsWithAllNullVariantValues() { + GenericRecord nullVariant1 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + nullVariant1.setField("id", 1L); + nullVariant1.setField("v", null); + + GenericRecord nullVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + nullVariant2.setField("id", 2L); + nullVariant2.setField("v", null); + + List<Record> allNullVariants = ImmutableList.of(nullVariant1, nullVariant2); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + allNullVariants, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).isEmpty(); + } + + @Test + public void testAnalyzeVariantColumnsRejectsNonVariantValues() { + GenericRecord invalidRecord = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + invalidRecord.setField("id", 1L); + invalidRecord.setField("v", "not-a-variant"); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + assertThatThrownBy( + () -> + analyzer.analyzeVariantColumns( + ImmutableList.of(invalidRecord), + VARIANT_AFTER_ID_SCHEMA, + VARIANT_AFTER_ID_SCHEMA)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Expected Variant at index 1 but was: java.lang.String"); + } + + @Test + public void testFormatModelRegistryShreddingRoundTrip() throws IOException { + OutputFile outputFile = Files.localOutput(temp.resolve("variant-shredded.parquet").toFile()); + EncryptedOutputFile encryptedOutputFile = EncryptedFiles.plainAsEncryptedOutput(outputFile); + + FileWriterBuilder<DataWriter<Record>, Object> writeBuilder = + FormatModelRegistry.dataWriteBuilder(FileFormat.PARQUET, Record.class, encryptedOutputFile); + + try (DataWriter<Record> writer = + writeBuilder + .schema(VARIANT_AFTER_ID_SCHEMA) + .spec(PartitionSpec.unpartitioned()) + .setAll( + ImmutableMap.of( + TableProperties.PARQUET_SHRED_VARIANTS, "true", + TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "2")) + .build()) { + for (Record rec : records) { + writer.write(rec); + } + } + + try (ParquetFileReader reader = + ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) { + MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema(); + GroupType variantGroup = parquetSchema.getType("v").asGroupType(); + assertThat(variantGroup.containsField("typed_value")).isTrue(); + + GroupType typedValue = variantGroup.getType("typed_value").asGroupType(); + assertThat(typedValue.containsField("a")).isTrue(); + assertThat(typedValue.containsField("b")).isTrue(); + } + + try (ParquetReader<Group> rawReader = + ParquetReader.builder( + new GroupReadSupport(), new org.apache.hadoop.fs.Path(outputFile.location())) + .build()) { + Group row = rawReader.read(); + Group variantData = row.getGroup("v", 0); + assertThat(variantData.getFieldRepetitionCount("value")).isEqualTo(0); + + Group typedValue = variantData.getGroup("typed_value", 0); + assertThat(typedValue.getGroup("a", 0).getInteger("typed_value", 0)).isEqualTo(42); Review Comment: Two things on this round-trip test. The high-level read-back via `InternalTestHelpers.assertEquals` covers all three records, which is great. The raw Parquet read here only checks row 0 though, and with `BUFFER_SIZE=2` and 3 input rows the second buffer flush (rows 2→3 boundary) is exactly what we'd want physical evidence for. A short `while ((row = rawReader.read()) != null)` loop asserting each row's `typed_value` would close that. Fine as a follow-up. More importantly: there's no test that goes through `RegistryBasedFileWriterFactory` (the actual KC production path). This test calls `dataWriteBuilder(...).schema(...).build()` directly and never touches `.engineSchema(...)`, so it exercises exactly the path the PR's `schema()` fix targets but not the KC shape (`.schema(s).engineSchema(null)`) — which is why the engineSchema overwrite I flagged in `ParquetFormatModel.java` slipped through. A test that builds via `GenericFileWriterFactory.Builder.writerProperties("write.parquet.shred-variants", "true").build()` and asserts `typed_value` groups appear in the output would catch that regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
