ayushtkn commented on code in PR #6152: URL: https://github.com/apache/hive/pull/6152#discussion_r2616167432
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java: ########## @@ -0,0 +1,1524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.InternalData; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionInputFile; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.hadoop.HadoopOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.parquet.ParquetValueWriters.PositionDeleteStructWriter; +import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroWriteSupport; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.conf.PlainParquetConfiguration; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type.ID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; + +// TODO: remove class once upgraded to Iceberg v1.11.0 (https://github.com/apache/iceberg/pull/14153) Review Comment: We copied a 1KLOC file just to fastrack this? we didn't had a hive release coming either We have done patching for stuff, for stuff which Iceberg community didn't agree or for some bug fixes or to maintain compat with our current version. Hardly Iceberg people accept stuff, for accepted stuff we should have waited for an official release and this isn't a small file nor something urgent either ########## iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java: ########## @@ -187,6 +195,68 @@ public void testSpecialCharacters() { Assert.assertArrayEquals(new Object[]{"star", 2L}, result.get(1)); } + @Test + public void testVariantSelectProjection() throws IOException { + assumeTrue(fileFormat == FileFormat.PARQUET); + assumeTrue(!isVectorized); + + TableIdentifier table = TableIdentifier.of("default", "variant_projection"); + shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table)); + + shell.executeStatement( + String.format( + "CREATE TABLE %s (id INT, payload VARIANT) STORED BY ICEBERG STORED AS %s %s %s", + table, + fileFormat, + testTables.locationForCreateTableSQL(table), + testTables.propertiesForCreateTableSQL( + ImmutableMap.of("format-version", "3", "variant.shredding.enabled", "true")))); + + shell.executeStatement( + String.format( + "INSERT INTO %s VALUES " + + "(1, parse_json('{\"name\":\"Alice\",\"age\":30}'))," + Review Comment: u change this to ``"(1, parse_json('null'))," +``, the whole feature gets disabled. If u remove the assertion values, this fails ``` assertThat(variantType.containsField("typed_value")).isTrue(); ``` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; + +public class VariantUtil { + + private VariantUtil() { + } + + /** + * Create a VariantShreddingFunction if variant shredding is enabled and the schema has variant columns. + * + * @param schema The Iceberg schema + * @param sampleRecord A sample record to infer variant schemas from actual data (can be null) + * @param properties Table properties to check if variant shredding is enabled + * @return An Optional containing the VariantShreddingFunction if applicable + */ + public static Optional<VariantShreddingFunction> variantShreddingFunc( + Schema schema, + Supplier<Record> sampleRecord, + Map<String, String> properties) { + + // Preconditions: must have variant columns + property enabled + if (!hasVariantColumns(schema) || !isVariantShreddingEnabled(properties)) { + return Optional.empty(); + } + + VariantShreddingFunction fn = + constructVariantShreddingFunc(sampleRecord.get(), schema); + + return Optional.of(fn); + } + + private static VariantShreddingFunction constructVariantShreddingFunc( + Record sampleRecord, Schema schema) { + + return (id, name) -> { + // Validate the field exists and is a variant type + Types.NestedField field = schema.findField(id); + + if (field == null || !(field.type() instanceof Types.VariantType)) { + return null; // Not a variant field, no shredding + } + + // If we have a sample record, try to generate schema from actual data + if (sampleRecord != null) { + try { + Object variantValue = sampleRecord.getField(name); + if (variantValue instanceof Variant variant) { Review Comment: I think we should infer schema for the first non null `variantValue` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; + +public class VariantUtil { + + private VariantUtil() { + } + + /** + * Create a VariantShreddingFunction if variant shredding is enabled and the schema has variant columns. + * + * @param schema The Iceberg schema + * @param sampleRecord A sample record to infer variant schemas from actual data (can be null) + * @param properties Table properties to check if variant shredding is enabled + * @return An Optional containing the VariantShreddingFunction if applicable + */ + public static Optional<VariantShreddingFunction> variantShreddingFunc( + Schema schema, + Supplier<Record> sampleRecord, + Map<String, String> properties) { + + // Preconditions: must have variant columns + property enabled + if (!hasVariantColumns(schema) || !isVariantShreddingEnabled(properties)) { + return Optional.empty(); + } + + VariantShreddingFunction fn = + constructVariantShreddingFunc(sampleRecord.get(), schema); + + return Optional.of(fn); + } + + private static VariantShreddingFunction constructVariantShreddingFunc( + Record sampleRecord, Schema schema) { + + return (id, name) -> { + // Validate the field exists and is a variant type + Types.NestedField field = schema.findField(id); + + if (field == null || !(field.type() instanceof Types.VariantType)) { + return null; // Not a variant field, no shredding + } + + // If we have a sample record, try to generate schema from actual data + if (sampleRecord != null) { + try { + Object variantValue = sampleRecord.getField(name); + if (variantValue instanceof Variant variant) { + // Use ParquetVariantUtil to generate schema from actual variant value + return ParquetVariantUtil.toParquetSchema(variant.value()); + } + } catch (Exception e) { + // Fall through to default schema + } + } + return null; + }; + } + + /** + * Check if the schema contains any variant columns. + */ + private static boolean hasVariantColumns(Schema schema) { + return schema.columns().stream() + .anyMatch(field -> field.type() instanceof Types.VariantType); Review Comment: what if field is of type `struct` and within that there is a `Variant` data type?. A table like ``` CREATE TABLE t_struct_variant ( id INT, s STRUCT< user_id: INT, payload: VARIANT > ) STORED BY ICEBERG TBLPROPERTIES ( 'format-version'='3', 'variant.shredding.enabled'='true' ); -- Insert JSON structures INSERT INTO t_struct_variant VALUES ( 1, named_struct( 'user_id', 101, 'payload', parse_json('{"name":"Alice","age":30}') ) ), ( 2, named_struct( 'user_id', 102, 'payload', parse_json('{"name":"Bob"}') ) ), ( 3, named_struct( 'user_id', 103, 'payload', parse_json('{"active":true,"score":9.5}') ) ); ``` It did had a variant column, but it got skipped <img width="1018" height="561" alt="image" src="https://github.com/user-attachments/assets/9184b744-6edf-47be-9a14-e1d6ab3a9a46" /> ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java: ########## @@ -149,4 +159,14 @@ HiveFileWriterFactory build() { positionDeleteRowSchema); } } + + /** + * Set a sample record to use for data-driven variant shredding schema generation. + * Should be called before the Parquet writer is created. + */ + public void initialize(Supplier<Record> record) { + if (sampleRecord == null) { + sampleRecord = record; Review Comment: thinking about this, we are taking the first record. Can it lead to `Task-level non-determinism`, like one insert lead to multiple inserts & each task captures its own first record & schema. but i think there wasn't a better way, maybe in some later world we allow the user itself to define the columns to be shredded ########## iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q: ########## @@ -0,0 +1,39 @@ +-- Mask random uuid +--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/ +-- Mask random snapshot id +--! qt:replace:/('current-snapshot-id'=')\d+/$1#SnapshotId#/ +-- Mask current-snapshot-timestamp-ms +--! qt:replace:/('current-snapshot-timestamp-ms'=')\d+/$1#Masked#/ + +-- SORT_QUERY_RESULTS +set hive.explain.user=false; +set hive.fetch.task.conversion=none; + +drop table if exists tbl_shredded_variant; + +-- Create test table +CREATE EXTERNAL TABLE tbl_shredded_variant ( + id INT, + data VARIANT +) STORED BY ICEBERG +tblproperties( + 'format-version'='3', + 'variant.shredding.enabled'='true' +); + +-- Insert JSON structures +INSERT INTO tbl_shredded_variant VALUES +(1, parse_json('{"name": "John", "age": 30, "active": true}')), +(2, parse_json('{"name": "Bill", "active": false}')), +(3, parse_json('{"name": "Henry", "age": 20}')); + +-- Disable vectorized execution until Variant type is supported +set hive.vectorized.execution.enabled=false; Review Comment: removing this isn't throwing any exception. In some cases it is actually giving wrong results. We should fallback ideally if we have a shredded variant column & vectorization enabled. In your test if you don't have this it gives ``` Caused by: java.lang.RuntimeException: MALFORMED_VARIANT at org.apache.hadoop.hive.serde2.variant.VariantUtil.malformedVariant(VariantUtil.java:180) at org.apache.hadoop.hive.serde2.variant.Variant.convertToByteArray(Variant.java:81) at org.apache.hadoop.hive.serde2.variant.Variant.from(Variant.java:59) at org.apache.hadoop.hive.ql.udf.generic.GenericUDFVariantGet.evaluate(GenericUDFVariantGet.java:102) ``` I tried another case, where it silently gives wrong result ``` CREATE TABLE t ( id INT, v VARIANT ) STORED BY ICEBERG TBLPROPERTIES ( 'format-version'='3', 'variant.shredding.enabled'='true' ); INSERT INTO t VALUES (1, parse_json('{"a": 1}')), (2, parse_json('{"b": 2}')); SELECT try_variant_get(v, '$.a'), try_variant_get(v, '$.b') FROM t ORDER BY id; ``` With vectorization off ``` 1 NULL NULL 2 ``` With vectorization on ``` NULL 2 NULL NULL ``` Without Shredding but vecotrization on ``` 1 NULL NULL 2 ``` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; + +public class VariantUtil { + + private VariantUtil() { + } + + /** + * Create a VariantShreddingFunction if variant shredding is enabled and the schema has variant columns. + * + * @param schema The Iceberg schema + * @param sampleRecord A sample record to infer variant schemas from actual data (can be null) + * @param properties Table properties to check if variant shredding is enabled + * @return An Optional containing the VariantShreddingFunction if applicable + */ + public static Optional<VariantShreddingFunction> variantShreddingFunc( + Schema schema, + Supplier<Record> sampleRecord, + Map<String, String> properties) { + + // Preconditions: must have variant columns + property enabled + if (!hasVariantColumns(schema) || !isVariantShreddingEnabled(properties)) { Review Comment: not a big thing, but i feel in general the check should be flipped ``` if (!isVariantShreddingEnabled(properties) || !hasVariantColumns(schema)) ``` if ``isVariantShreddingEnabled`` isn't enabled, we need not to parse the schema at all, here if it isn't enabled, still the first condition in or will be executed first for no reason, and that is iterating over the entire DS -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
