ahmedabu98 commented on code in PR #37701: URL: https://github.com/apache/beam/pull/37701#discussion_r2983259145
########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java: ########## @@ -0,0 +1,589 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; +import static org.apache.beam.sdk.metrics.Metrics.counter; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.BeamParquetInputFile; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hasher; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.orc.OrcMetrics; +import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A transform that takes in a stream of file paths, converts them to Iceberg {@link DataFile}s with + * partition metadata and metrics, then commits them to an Iceberg {@link Table}. + */ +public class AddFiles extends PTransform<PCollection<String>, PCollectionRowTuple> { + private static final Duration DEFAULT_TRIGGER_INTERVAL = Duration.standardMinutes(10); + private static final Counter numFilesAdded = counter(AddFiles.class, "numFilesAdded"); + private static final Counter numErrorFiles = counter(AddFiles.class, "numErrorFiles"); + private static final Logger LOG = LoggerFactory.getLogger(AddFiles.class); + private static final int DEFAULT_FILES_TRIGGER = 1_000; + static final Schema ERROR_SCHEMA = + Schema.builder().addStringField("file").addStringField("error").build(); + private final IcebergCatalogConfig catalogConfig; + private final String tableIdentifier; + private final Duration intervalTrigger; + private final int numFilesTrigger; + private final @Nullable String locationPrefix; + private final @Nullable List<String> partitionFields; + private final @Nullable Map<String, String> tableProps; + + public AddFiles( + IcebergCatalogConfig catalogConfig, + String tableIdentifier, + @Nullable String locationPrefix, + @Nullable List<String> partitionFields, + @Nullable Map<String, String> tableProps, + @Nullable Integer numFilesTrigger, + @Nullable Duration intervalTrigger) { + this.catalogConfig = catalogConfig; + this.tableIdentifier = tableIdentifier; + this.partitionFields = partitionFields; + this.tableProps = tableProps; + this.intervalTrigger = intervalTrigger != null ? intervalTrigger : DEFAULT_TRIGGER_INTERVAL; + this.numFilesTrigger = numFilesTrigger != null ? numFilesTrigger : DEFAULT_FILES_TRIGGER; + this.locationPrefix = locationPrefix; + } + + @Override + public PCollectionRowTuple expand(PCollection<String> input) { + LOG.info( + "AddFiles configured to commit after accumulating {} files, or after {} seconds.", + numFilesTrigger, + intervalTrigger.getStandardSeconds()); + if (!Strings.isNullOrEmpty(locationPrefix)) { + LOG.info( + "AddFiles configured to build partition metadata after the prefix: '{}'", locationPrefix); + } + + PCollectionTuple dataFiles = + input.apply( + "ConvertToDataFiles", + ParDo.of( + new ConvertToDataFile( + catalogConfig, + tableIdentifier, + locationPrefix, + partitionFields, + tableProps)) + .withOutputTags(DATA_FILES, TupleTagList.of(ERRORS))); + SchemaCoder<SerializableDataFile> sdfSchema; + try { + sdfSchema = SchemaRegistry.createDefault().getSchemaCoder(SerializableDataFile.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + + PCollection<KV<Void, SerializableDataFile>> keyedFiles = + dataFiles + .get(DATA_FILES) + .setCoder(sdfSchema) + .apply("AddStaticKey", WithKeys.of((Void) null)); + + PCollection<KV<Void, Iterable<SerializableDataFile>>> groupedFiles = + keyedFiles.isBounded().equals(BOUNDED) + ? keyedFiles.apply(GroupByKey.create()) + : keyedFiles.apply( + GroupIntoBatches.<Void, SerializableDataFile>ofSize(numFilesTrigger) + .withMaxBufferingDuration(intervalTrigger)); + + PCollection<Row> snapshots = + groupedFiles + .apply( + "CommitFilesToIceberg", + ParDo.of(new CommitFilesDoFn(catalogConfig, tableIdentifier))) + .setRowSchema(SnapshotInfo.getSchema()); + + return PCollectionRowTuple.of( + "snapshots", snapshots, "errors", dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA)); + } + + static class ConvertToDataFile extends DoFn<String, SerializableDataFile> { + private final IcebergCatalogConfig catalogConfig; + private final String identifier; + public static final TupleTag<Row> ERRORS = new TupleTag<>(); + public static final TupleTag<SerializableDataFile> DATA_FILES = new TupleTag<>(); + private final @Nullable String prefix; + private final @Nullable List<String> partitionFields; + private final @Nullable Map<String, String> tableProps; + private transient @MonotonicNonNull Table table; + + public ConvertToDataFile( + IcebergCatalogConfig catalogConfig, + String identifier, + @Nullable String prefix, + @Nullable List<String> partitionFields, + @Nullable Map<String, String> tableProps) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + this.prefix = prefix; + this.partitionFields = partitionFields; + this.tableProps = tableProps; + } + + private static final String PREFIX_ERROR = "File path did not start with the specified prefix"; + private static final String UNKNOWN_FORMAT_ERROR = "Could not determine the file's format"; + static final String UNKNOWN_PARTITION_ERROR = "Could not determine the file's partition"; + + @ProcessElement + public void process(@Element String filePath, MultiOutputReceiver output) throws IOException { + FileFormat format; + try { + format = inferFormat(filePath); + } catch (UnknownFormatException e) { + output + .get(ERRORS) + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, UNKNOWN_FORMAT_ERROR).build()); + numErrorFiles.inc(); + return; + } + + if (table == null) { + table = getOrCreateTable(getSchema(filePath, format)); + } + + // Check if the file path contains the provided prefix + if (table.spec().isPartitioned() + && !Strings.isNullOrEmpty(prefix) + && !filePath.startsWith(checkStateNotNull(prefix))) { + output + .get(ERRORS) + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, PREFIX_ERROR).build()); + numErrorFiles.inc(); + return; + } + + InputFile inputFile = table.io().newInputFile(filePath); + + Metrics metrics = + getFileMetrics( + inputFile, format, MetricsConfig.forTable(table), MappingUtil.create(table.schema())); + + // Figure out which partition this DataFile should go to + String partitionPath; + if (table.spec().isUnpartitioned()) { + partitionPath = ""; + } else if (!Strings.isNullOrEmpty(prefix)) { + // option 1: use directory structure to determine partition + // Note: we don't validate the DataFile content here + partitionPath = getPartitionFromFilePath(filePath); + } else { + try { + // option 2: examine DataFile min/max statistics to determine partition + partitionPath = getPartitionFromMetrics(metrics, inputFile); + } catch (UnknownPartitionException e) { + output + .get(ERRORS) + .output( + Row.withSchema(ERROR_SCHEMA) + .addValues(filePath, UNKNOWN_PARTITION_ERROR + ": " + e.getMessage()) + .build()); + numErrorFiles.inc(); + return; + } + } + + DataFile df = + DataFiles.builder(table.spec()) + .withPath(filePath) + .withFormat(format) + .withMetrics(metrics) + .withFileSizeInBytes(inputFile.getLength()) + .withPartitionPath(partitionPath) + .build(); + + output.get(DATA_FILES).output(SerializableDataFile.from(df, partitionPath)); + } + + private <W, T> T transformValue(Transform<W, T> transform, Type type, ByteBuffer bytes) { + return transform.bind(type).apply(Conversions.fromByteBuffer(type, bytes)); + } + + private Table getOrCreateTable(org.apache.iceberg.Schema schema) { + PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema); + try { + return tableProps == null + ? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec) + : catalogConfig + .catalog() + .createTable(TableIdentifier.parse(identifier), schema, spec, tableProps); + } catch (AlreadyExistsException e) { // if table already exists, just load it + return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } + } + + /** + * We don't have a table yet, so we don't know which FileIO to use to read these files. Instead, + * we use Beam's FileSystem utilities to read the file and extract its schema to create the + * table + */ + private static org.apache.iceberg.Schema getSchema(String filePath, FileFormat format) + throws IOException { + Preconditions.checkArgument( + format.equals(FileFormat.PARQUET), "Table creation is only supported for Parquet files."); + try (ParquetFileReader reader = ParquetFileReader.open(getParquetInputFile(filePath))) { + MessageType messageType = reader.getFooter().getFileMetaData().getSchema(); + return ParquetSchemaUtil.convert(messageType); + } + } + + private String getPartitionFromFilePath(String filePath) { + if (checkStateNotNull(table).spec().isUnpartitioned()) { + return ""; + } + String partitionPath = filePath.substring(checkStateNotNull(prefix).length()); + int lastSlashIndex = partitionPath.lastIndexOf('/'); + + return lastSlashIndex > 0 ? partitionPath.substring(0, lastSlashIndex) : ""; + } + + /** + * Examines the min/max values of each partition column to determine the destination partition. + * + * <p>If the transformed min/max values are not equal for any given column, we won't be able to + * determine the partition. In this case, we output the DataFile to the DLQ, because assigning + * an incorrect partition may lead to it being hidden from some queries. + */ + private String getPartitionFromMetrics(Metrics metrics, InputFile inputFile) + throws UnknownPartitionException, IOException { + Table table = checkStateNotNull(this.table); + List<PartitionField> fields = table.spec().fields(); + List<Integer> sourceIds = + fields.stream().map(PartitionField::sourceId).collect(Collectors.toList()); + Metrics partitionMetrics; + // Check if metrics already includes partition columns (configured by table properties): + if (metrics.lowerBounds().keySet().containsAll(sourceIds) + && metrics.upperBounds().keySet().containsAll(sourceIds)) { + partitionMetrics = metrics; + } else { + // Otherwise, recollect metrics and ensure it includes all partition fields. + // Note: we don't attach these additional metrics to the DataFile because we can't assume + // that's in the user's best interest. + // Some tables are very wide and users may not want to store excessive metadata. + List<String> sourceNames = + fields.stream() + .map(pf -> checkStateNotNull(table.schema().idToName().get(pf.sourceId()))) + .collect(Collectors.toList()); + Map<String, String> configProps = + sourceNames.stream() + .collect(Collectors.toMap(s -> "write.metadata.metrics.column." + s, s -> "full")); + MetricsConfig configWithPartitionFields = MetricsConfig.fromProperties(configProps); + partitionMetrics = + getFileMetrics( + inputFile, + inferFormat(inputFile.location()), + configWithPartitionFields, + MappingUtil.create(table.schema())); + } + + PartitionKey pk = new PartitionKey(table.spec(), table.schema()); + + for (int i = 0; i < fields.size(); i++) { + PartitionField field = fields.get(i); + Type type = table.schema().findType(field.sourceId()); + Transform<?, ?> transform = field.transform(); + + // Make a best effort estimate by comparing the lower and upper transformed values. + // If the transformed values are equal, assume that the DataFile's data safely + // aligns with the same partition. + // TODO(ahmedabu98): is comparing min/max safe enough for bucketing transform? Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
