This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b6d8f428e29 Iceberg Add Files (#37701)
b6d8f428e29 is described below
commit b6d8f428e2966468abfababc03440f82a1cb8e59
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Mar 25 10:44:45 2026 -0400
Iceberg Add Files (#37701)
* add files transform and schematransform
* minor fixes
* add tests
* create table if needed; determine partition spec from metrics
* spotless
* add batch route as well
* spotless
* extract SchemaTransform logic out
* add integration tests
* spotless
* spotless
* fix deps
* add a few more tests; add error handling output
* clarify comments
* add comment
---
.../IO_Iceberg_Integration_Tests.json | 2 +-
sdks/java/io/iceberg/build.gradle | 7 +-
.../org/apache/beam/sdk/io/iceberg/AddFiles.java | 671 +++++++++++++++++++++
.../iceberg/AddFilesSchemaTransformProvider.java | 190 ++++++
.../apache/beam/sdk/io/iceberg/PartitionUtils.java | 5 +-
.../org/apache/beam/sdk/io/iceberg/ReadUtils.java | 30 +
.../org/apache/beam/sdk/io/iceberg/AddFilesIT.java | 536 ++++++++++++++++
.../apache/beam/sdk/io/iceberg/AddFilesTest.java | 655 ++++++++++++++++++++
.../org/apache/beam/sdk/io/parquet/ParquetIO.java | 4 +-
9 files changed, 2095 insertions(+), 5 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 34a6e02150e..b73af5e61a4 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 4
+ "modification": 1
}
diff --git a/sdks/java/io/iceberg/build.gradle
b/sdks/java/io/iceberg/build.gradle
index a1f352d0530..bbd55fee2fc 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -51,6 +51,8 @@ dependencies {
implementation library.java.joda_time
implementation "org.apache.parquet:parquet-column:$parquet_version"
implementation "org.apache.parquet:parquet-hadoop:$parquet_version"
+ implementation "org.apache.parquet:parquet-common:$parquet_version"
+ implementation project(":sdks:java:io:parquet")
implementation "org.apache.orc:orc-core:$orc_version"
implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
@@ -74,11 +76,13 @@ dependencies {
testImplementation library.java.bigdataoss_gcsio
testImplementation library.java.bigdataoss_util_hadoop
testImplementation "org.apache.parquet:parquet-avro:$parquet_version"
- testImplementation "org.apache.parquet:parquet-common:$parquet_version"
testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
testImplementation project(path: ":sdks:java:core", configuration:
"shadowTest")
testImplementation
project(":sdks:java:extensions:google-cloud-platform-core")
testImplementation library.java.junit
+ testImplementation library.java.hamcrest
+ testImplementation project(path: ":sdks:java:extensions:avro")
+ testImplementation 'org.awaitility:awaitility:4.2.0'
// Hive catalog test dependencies
testImplementation project(path: ":sdks:java:io:iceberg:hive")
@@ -95,6 +99,7 @@ dependencies {
testImplementation project(path: ":sdks:java:io:iceberg:bqms",
configuration: "shadow")
testImplementation project(":sdks:java:io:google-cloud-platform")
testImplementation library.java.google_api_services_bigquery
+ testImplementation 'com.google.cloud:google-cloud-storage'
testImplementation library.java.google_auth_library_oauth2_http
testRuntimeOnly library.java.slf4j_jdk14
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
new file mode 100644
index 00000000000..4a164700099
--- /dev/null
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static
org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES;
+import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS;
+import static org.apache.beam.sdk.metrics.Metrics.counter;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Semaphore;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.BeamParquetInputFile;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hasher;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A transform that takes in a stream of file paths, converts them to Iceberg
{@link DataFile}s with
+ * partition metadata and metrics, then commits them to an Iceberg {@link
Table}.
+ */
+public class AddFiles extends PTransform<PCollection<String>,
PCollectionRowTuple> {
+ static final String OUTPUT_TAG = "snapshots";
+ static final String ERROR_TAG = "errors";
+ private static final Duration DEFAULT_TRIGGER_INTERVAL =
Duration.standardMinutes(10);
+ private static final Counter numFilesAdded = counter(AddFiles.class,
"numFilesAdded");
+ private static final Counter numErrorFiles = counter(AddFiles.class,
"numErrorFiles");
+ private static final Logger LOG = LoggerFactory.getLogger(AddFiles.class);
+ private static final int DEFAULT_FILES_TRIGGER = 1_000;
+ static final Schema ERROR_SCHEMA =
+ Schema.builder().addStringField("file").addStringField("error").build();
+ private final IcebergCatalogConfig catalogConfig;
+ private final String tableIdentifier;
+ private final Duration intervalTrigger;
+ private final int numFilesTrigger;
+ private final @Nullable String locationPrefix;
+ private final @Nullable List<String> partitionFields;
+ private final @Nullable Map<String, String> tableProps;
+
+ public AddFiles(
+ IcebergCatalogConfig catalogConfig,
+ String tableIdentifier,
+ @Nullable String locationPrefix,
+ @Nullable List<String> partitionFields,
+ @Nullable Map<String, String> tableProps,
+ @Nullable Integer numFilesTrigger,
+ @Nullable Duration intervalTrigger) {
+ this.catalogConfig = catalogConfig;
+ this.tableIdentifier = tableIdentifier;
+ this.partitionFields = partitionFields;
+ this.tableProps = tableProps;
+ this.intervalTrigger = intervalTrigger != null ? intervalTrigger :
DEFAULT_TRIGGER_INTERVAL;
+ this.numFilesTrigger = numFilesTrigger != null ? numFilesTrigger :
DEFAULT_FILES_TRIGGER;
+ this.locationPrefix = locationPrefix;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollection<String> input) {
+ LOG.info(
+ "AddFiles configured to commit after accumulating {} files, or after
{} seconds.",
+ numFilesTrigger,
+ intervalTrigger.getStandardSeconds());
+ if (!Strings.isNullOrEmpty(locationPrefix)) {
+ LOG.info(
+ "AddFiles configured to build partition metadata after the prefix:
'{}'", locationPrefix);
+ }
+
+ PCollectionTuple dataFiles =
+ input.apply(
+ "ConvertToDataFiles",
+ ParDo.of(
+ new ConvertToDataFile(
+ catalogConfig,
+ tableIdentifier,
+ locationPrefix,
+ partitionFields,
+ tableProps))
+ .withOutputTags(DATA_FILES, TupleTagList.of(ERRORS)));
+ SchemaCoder<SerializableDataFile> sdfSchema;
+ try {
+ sdfSchema =
SchemaRegistry.createDefault().getSchemaCoder(SerializableDataFile.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ PCollection<KV<Void, SerializableDataFile>> keyedFiles =
+ dataFiles
+ .get(DATA_FILES)
+ .setCoder(sdfSchema)
+ .apply("AddStaticKey", WithKeys.of((Void) null));
+
+ PCollection<KV<Void, Iterable<SerializableDataFile>>> groupedFiles =
+ keyedFiles.isBounded().equals(BOUNDED)
+ ? keyedFiles.apply(GroupByKey.create())
+ : keyedFiles.apply(
+ GroupIntoBatches.<Void,
SerializableDataFile>ofSize(numFilesTrigger)
+ .withMaxBufferingDuration(intervalTrigger));
+
+ PCollection<Row> snapshots =
+ groupedFiles
+ .apply(
+ "CommitFilesToIceberg",
+ ParDo.of(new CommitFilesDoFn(catalogConfig, tableIdentifier)))
+ .setRowSchema(SnapshotInfo.getSchema());
+
+ return PCollectionRowTuple.of(
+ OUTPUT_TAG, snapshots, ERROR_TAG,
dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA));
+ }
+
+ static class ConvertToDataFile extends DoFn<String, SerializableDataFile> {
+ private final IcebergCatalogConfig catalogConfig;
+ private final String identifier;
+ public static final TupleTag<Row> ERRORS = new TupleTag<>();
+ public static final TupleTag<SerializableDataFile> DATA_FILES = new
TupleTag<>();
+ private final @Nullable String prefix;
+ private final @Nullable List<String> partitionFields;
+ private final @Nullable Map<String, String> tableProps;
+ private transient @MonotonicNonNull Table table;
+ // Limit open readers to avoid blowing up memory on one worker
+ private static final int MAX_READERS = 10;
+ private static final Semaphore ACTIVE_READERS = new Semaphore(MAX_READERS);
+
+ public ConvertToDataFile(
+ IcebergCatalogConfig catalogConfig,
+ String identifier,
+ @Nullable String prefix,
+ @Nullable List<String> partitionFields,
+ @Nullable Map<String, String> tableProps) {
+ this.catalogConfig = catalogConfig;
+ this.identifier = identifier;
+ this.prefix = prefix;
+ this.partitionFields = partitionFields;
+ this.tableProps = tableProps;
+ }
+
+ static final String PREFIX_ERROR = "File path did not start with the
specified prefix";
+ private static final String UNKNOWN_FORMAT_ERROR = "Could not determine
the file's format";
+ static final String UNKNOWN_PARTITION_ERROR = "Could not determine the
file's partition: ";
+
+ @ProcessElement
+ public void process(@Element String filePath, MultiOutputReceiver output)
+ throws IOException, InterruptedException {
+ FileFormat format;
+ try {
+ format = inferFormat(filePath);
+ } catch (UnknownFormatException e) {
+ output
+ .get(ERRORS)
+ .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath,
UNKNOWN_FORMAT_ERROR).build());
+ numErrorFiles.inc();
+ return;
+ }
+
+ if (table == null) {
+ table = getOrCreateTable(getSchema(filePath, format));
+ }
+
+ // Check if the file path contains the provided prefix
+ if (table.spec().isPartitioned()
+ && !Strings.isNullOrEmpty(prefix)
+ && !filePath.startsWith(checkStateNotNull(prefix))) {
+ output
+ .get(ERRORS)
+ .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath,
PREFIX_ERROR).build());
+ numErrorFiles.inc();
+ return;
+ }
+
+ InputFile inputFile = table.io().newInputFile(filePath);
+
+ Metrics metrics =
+ getFileMetrics(
+ inputFile, format, MetricsConfig.forTable(table),
MappingUtil.create(table.schema()));
+
+ // Figure out which partition this DataFile should go to
+ String partitionPath;
+ if (table.spec().isUnpartitioned()) {
+ partitionPath = "";
+ } else if (!Strings.isNullOrEmpty(prefix)) {
+ // option 1: use directory structure to determine partition
+ // Note: we don't validate the DataFile content here
+ partitionPath = getPartitionFromFilePath(filePath);
+ } else {
+ try {
+ // option 2: examine DataFile min/max statistics to determine
partition
+ partitionPath = getPartitionFromMetrics(metrics, inputFile, table);
+ } catch (UnknownPartitionException e) {
+ output
+ .get(ERRORS)
+ .output(
+ Row.withSchema(ERROR_SCHEMA)
+ .addValues(filePath, UNKNOWN_PARTITION_ERROR +
e.getMessage())
+ .build());
+ numErrorFiles.inc();
+ return;
+ }
+ }
+
+ DataFile df =
+ DataFiles.builder(table.spec())
+ .withPath(filePath)
+ .withFormat(format)
+ .withMetrics(metrics)
+ .withFileSizeInBytes(inputFile.getLength())
+ .withPartitionPath(partitionPath)
+ .build();
+
+ output.get(DATA_FILES).output(SerializableDataFile.from(df,
partitionPath));
+ }
+
+ static <W, T> T transformValue(Transform<W, T> transform, Type type,
ByteBuffer bytes) {
+ return transform.bind(type).apply(Conversions.fromByteBuffer(type,
bytes));
+ }
+
+ private static <W, T> T transformValue(Transform<W, T> transform, Type
type, Object value) {
+ return transform.bind(type).apply((W) value);
+ }
+
+ private Table getOrCreateTable(org.apache.iceberg.Schema schema) {
+ PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields,
schema);
+ try {
+ return tableProps == null
+ ?
catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema,
spec)
+ : catalogConfig
+ .catalog()
+ .createTable(TableIdentifier.parse(identifier), schema, spec,
tableProps);
+ } catch (AlreadyExistsException e) { // if table already exists, just
load it
+ return
catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
+ }
+ }
+
+ /**
+ * We don't have a table yet, so we don't know which FileIO to use to read
these files. Instead,
+ * we use Beam's FileSystem utilities to read the file and extract its
schema to create the
+ * table
+ */
+ private static org.apache.iceberg.Schema getSchema(String filePath,
FileFormat format)
+ throws IOException {
+ Preconditions.checkArgument(
+ format.equals(FileFormat.PARQUET), "Table creation is only supported
for Parquet files.");
+ try (ParquetFileReader reader =
ParquetFileReader.open(getParquetInputFile(filePath))) {
+ MessageType messageType =
reader.getFooter().getFileMetaData().getSchema();
+ return ParquetSchemaUtil.convert(messageType);
+ }
+ }
+
+ private String getPartitionFromFilePath(String filePath) {
+ if (checkStateNotNull(table).spec().isUnpartitioned()) {
+ return "";
+ }
+ String partitionPath =
filePath.substring(checkStateNotNull(prefix).length());
+ int lastSlashIndex = partitionPath.lastIndexOf('/');
+
+ return lastSlashIndex > 0 ? partitionPath.substring(0, lastSlashIndex) :
"";
+ }
+
+ /**
+ * Examines the min/max values of each partition column to determine the
destination partition.
+ *
+ * <p>If the transformed min/max values are not equal for any given
column, we won't be able to
+ * determine the partition. We also cannot fall back to a "null"
partition, because that will
+ * also get skipped by most queries.
+ *
+ * <p>The Bucket partition transform is an exceptional case because it is
not monotonic, meaning
+ * it's not enough to just compare the min and max values. There may be a
middle value somewhere
+ * that gets hashed to a different value. For this transform, we'll need
to read all the values
+ * in the column ensure they all get transformed to the same partition
value.
+ *
+ * <p>In these cases, we output the DataFile to the DLQ, because assigning
an incorrect
+ * partition may lead to it being incorrectly ignored by downstream
queries.
+ */
+ static String getPartitionFromMetrics(Metrics metrics, InputFile
inputFile, Table table)
+ throws UnknownPartitionException, IOException, InterruptedException {
+ List<PartitionField> fields = table.spec().fields();
+ List<Integer> sourceIds =
+
fields.stream().map(PartitionField::sourceId).collect(Collectors.toList());
+ Metrics partitionMetrics;
+ // Check if metrics already includes partition columns (configured by
table properties):
+ if (metrics.lowerBounds().keySet().containsAll(sourceIds)
+ && metrics.upperBounds().keySet().containsAll(sourceIds)) {
+ partitionMetrics = metrics;
+ } else {
+ // Otherwise, recollect metrics and ensure it includes all partition
fields.
+ // Note: we don't attach these additional metrics to the DataFile
because we can't assume
+ // that's in the user's best interest.
+ // Some tables are very wide and users may not want to store excessive
metadata.
+ List<String> sourceNames =
+ fields.stream()
+ .map(pf -> table.schema().findColumnName(pf.sourceId()))
+ .collect(Collectors.toList());
+ Map<String, String> configProps =
+ sourceNames.stream()
+ .collect(Collectors.toMap(s ->
"write.metadata.metrics.column." + s, s -> "full"));
+ MetricsConfig configWithPartitionFields =
MetricsConfig.fromProperties(configProps);
+ partitionMetrics =
+ getFileMetrics(
+ inputFile,
+ inferFormat(inputFile.location()),
+ configWithPartitionFields,
+ MappingUtil.create(table.schema()));
+ }
+
+ PartitionKey pk = new PartitionKey(table.spec(), table.schema());
+
+ HashMap<Integer, PartitionField> bucketPartitions = new HashMap<>();
+ for (int i = 0; i < fields.size(); i++) {
+ PartitionField field = fields.get(i);
+ Transform<?, ?> transform = field.transform();
+ if (transform.toString().contains("bucket[")) {
+ bucketPartitions.put(i, field);
+ }
+ }
+
+ // first, read only metadata for the non-bucket partition types
+ for (int i = 0; i < fields.size(); i++) {
+ PartitionField field = fields.get(i);
+ // skip bucket partitions (we will process them below)
+ if (bucketPartitions.containsKey(i)) {
+ continue;
+ }
+ Type type = table.schema().findType(field.sourceId());
+ Transform<?, ?> transform = field.transform();
+
+ // Make a best effort estimate by comparing the lower and upper
transformed values.
+ // If the transformed values are equal, assume that the DataFile's
data safely
+ // aligns with the same partition.
+ ByteBuffer lowerBytes =
partitionMetrics.lowerBounds().get(field.sourceId());
+ ByteBuffer upperBytes =
partitionMetrics.upperBounds().get(field.sourceId());
+ if (lowerBytes == null && upperBytes == null) {
+ continue;
+ } else if (lowerBytes == null || upperBytes == null) {
+ throw new UnknownPartitionException(
+ "Only one of the min/max was was null, for field "
+ + table.schema().findColumnName(field.sourceId()));
+ }
+ Object lowerTransformedValue = transformValue(transform, type,
lowerBytes);
+ Object upperTransformedValue = transformValue(transform, type,
upperBytes);
+
+ if (!Objects.deepEquals(lowerTransformedValue, upperTransformedValue))
{
+ // The DataFile contains values that align to different partitions,
so we cannot
+ // safely determine a partition.
+ throw new UnknownPartitionException(
+ "Min and max transformed values were not equal, for column: " +
field.name());
+ }
+
+ pk.set(i, lowerTransformedValue);
+ }
+
+ // bucket transform needs extra processing (see java doc above)
+ if (!bucketPartitions.isEmpty()) {
+ // Optimize by only reading bucket-transformed columns into memory
+ org.apache.iceberg.Schema bucketCols =
+ TypeUtil.select(
+ table.schema(),
+ bucketPartitions.values().stream()
+ .map(PartitionField::sourceId)
+ .collect(Collectors.toSet()));
+
+ // Keep one instance of transformed value per column. Use this to
compare against each
+ // record's transformed value.
+ // Values in the same columns must yield the same transformed value,
otherwise we cannot
+ // determine a partition
+ // from this file.
+ Map<Integer, Object> transformedValues = new HashMap<>();
+
+ // Do a one-time read of the file and compare all bucket-transformed
columns
+ ACTIVE_READERS.acquire();
+ try (CloseableIterable<Record> reader =
ReadUtils.createReader(inputFile, bucketCols)) {
+ for (Record record : reader) {
+ for (Map.Entry<Integer, PartitionField> entry :
bucketPartitions.entrySet()) {
+ int partitionIndex = entry.getKey();
+ PartitionField partitionField = entry.getValue();
+ Transform<?, ?> transform = partitionField.transform();
+ Types.NestedField field =
table.schema().findField(partitionField.sourceId());
+ Object value = record.getField(field.name());
+
+ // set initial transformed value for this column
+ @Nullable Object transformedValue =
transformedValues.get(partitionIndex);
+ Object currentTransformedValue = transformValue(transform,
field.type(), value);
+ if (transformedValue == null) {
+ transformedValues.put(partitionIndex,
checkStateNotNull(currentTransformedValue));
+ continue;
+ }
+
+ if (!Objects.deepEquals(currentTransformedValue,
transformedValue)) {
+ throw new UnknownPartitionException(
+ "Found records with conflicting transformed values, for
column: "
+ + field.name());
+ }
+ }
+ }
+ } finally {
+ ACTIVE_READERS.release();
+ }
+
+ for (Map.Entry<Integer, Object> partitionCol :
transformedValues.entrySet()) {
+ pk.set(partitionCol.getKey(), partitionCol.getValue());
+ }
+ }
+ return pk.toPath();
+ }
+ }
+
+ /**
+ * A stateful {@link DoFn} that commits batches of files to an Iceberg table.
+ *
+ * <p>Addresses two primary concerns:
+ *
+ * <ul>
+ * <li><b>Concurrency:</b> Being stateful on a dummy {@code Void} key
forces the runner to
+ * process batches sequentially, preventing concurrent commit
conflicts on the Iceberg
+ * table.
+ * <li><b>Idempotency:</b> Prevents duplicate commits during bundle
failures by calculating a
+ * deterministic hash for the file set. This ID is stored in the
Iceberg {@code Snapshot}
+ * summary, under the key {@code "beam.add-files-commit-id"}. Before
committing, the DoFn
+ * travereses backwards through recent snapshots to check if the
current batch's ID is
+ * already present.
+ * </ul>
+ *
+ * <p>Outputs the resulting Iceberg {@link Snapshot} information.
+ */
+ static class CommitFilesDoFn extends DoFn<KV<Void,
Iterable<SerializableDataFile>>, Row> {
+ private final IcebergCatalogConfig catalogConfig;
+ private final String identifier;
+ private transient @MonotonicNonNull Table table = null;
+ private static final String COMMIT_ID_KEY = "beam.add-files-commit-id";
+
+ @StateId("lastCommitTimestamp")
+ private final StateSpec<ValueState<Long>> lastCommitTimestamp =
+ StateSpecs.value(VarLongCoder.of());
+
+ public CommitFilesDoFn(IcebergCatalogConfig catalogConfig, String
identifier) {
+ this.catalogConfig = catalogConfig;
+ this.identifier = identifier;
+ }
+
+ @StartBundle
+ public void start() {
+ if (table == null) {
+ table =
catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
+ }
+ }
+
+ @ProcessElement
+ public void process(
+ @Element KV<Void, Iterable<SerializableDataFile>> files,
+ @AlwaysFetched @StateId("lastCommitTimestamp") ValueState<Long>
lastCommitTimestamp,
+ OutputReceiver<Row> output) {
+ String commitId = commitHash(files.getValue());
+ Table table = checkStateNotNull(this.table);
+ table.refresh();
+
+ if (shouldSkip(commitId, lastCommitTimestamp.read())) {
+ return;
+ }
+
+ int numFiles = 0;
+ AppendFiles appendFiles = table.newFastAppend();
+ for (SerializableDataFile file : files.getValue()) {
+ DataFile df = file.createDataFile(table.specs());
+ appendFiles.appendFile(df);
+ numFiles++;
+ }
+ appendFiles.set(COMMIT_ID_KEY, commitId);
+ LOG.info("Committing {} files, with commit ID: {}", numFiles, commitId);
+ appendFiles.commit();
+
+ Snapshot snapshot = table.currentSnapshot();
+ output.output(SnapshotInfo.fromSnapshot(snapshot).toRow());
+ lastCommitTimestamp.write(snapshot.timestampMillis());
+ numFilesAdded.inc(numFiles);
+ }
+
+ private String commitHash(Iterable<SerializableDataFile> files) {
+ Hasher hasher = Hashing.sha256().newHasher();
+
+ // Extract, sort, and hash to ensure deterministic output
+ List<String> paths = new ArrayList<>();
+ for (SerializableDataFile file : files) {
+ paths.add(file.getPath());
+ }
+ Collections.sort(paths);
+
+ for (String path : paths) {
+ hasher.putString(path, StandardCharsets.UTF_8);
+ }
+ return hasher.hash().toString();
+ }
+
+ /**
+ * Performs a look-back through Iceberg table history to determine if this
specific batch of
+ * files has already been successfully committed.
+ */
+ private boolean shouldSkip(String commitUID, @Nullable Long
lastCommitTimestamp) {
+ if (lastCommitTimestamp == null) {
+ return false;
+ }
+ Table table = checkStateNotNull(this.table);
+
+ // check past snapshots to see if they contain the commit ID
+ @Nullable Snapshot current = table.currentSnapshot();
+ while (current != null && current.timestampMillis() >
lastCommitTimestamp) {
+ Map<String, String> summary = current.summary();
+ if (summary != null && commitUID.equals(summary.get(COMMIT_ID_KEY))) {
+ return true; // commit already happened, we should skip
+ }
+ if (current.parentId() == null) {
+ break;
+ }
+ current = table.snapshot(current.parentId());
+ }
+
+ return false;
+ }
+ }
+
+ @SuppressWarnings("argument")
+ public static Metrics getFileMetrics(
+ InputFile file, FileFormat format, MetricsConfig config, NameMapping
mapping)
+ throws IOException {
+ switch (format) {
+ case PARQUET:
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(getParquetInputFile(file.location()))) {
+ ParquetMetadata footer = reader.getFooter();
+ MessageType originalMessageType =
footer.getFileMetaData().getSchema();
+ if (!ParquetSchemaUtil.hasIds(originalMessageType)) {
+ footer = getFooterWithTypeIds(originalMessageType, footer,
mapping);
+ }
+
+ return ParquetUtil.footerMetrics(footer, Stream.empty(), config,
mapping);
+ }
+ case ORC:
+ return OrcMetrics.fromInputFile(file, config, mapping);
+ case AVRO:
+ return new Metrics(Avro.rowCount(file), null, null, null, null);
+ default:
+ throw new UnsupportedOperationException("Unsupported format: " +
format);
+ }
+ }
+
+ /** Tries to infer other file formats. Defaults to Parquet. */
+ public static FileFormat inferFormat(String path) {
+ String lowerPath = path.toLowerCase();
+
+ if (lowerPath.endsWith(".parquet") || lowerPath.endsWith(".pqt")) {
+ return FileFormat.PARQUET;
+ } else if (lowerPath.endsWith(".orc")) {
+ return FileFormat.ORC;
+ } else if (lowerPath.endsWith(".avro")) {
+ return FileFormat.AVRO;
+ } else {
+ throw new UnknownFormatException();
+ }
+ }
+
+ static ParquetMetadata getFooterWithTypeIds(
+ MessageType originalMessageType, ParquetMetadata footer, NameMapping
mapping) {
+ originalMessageType =
ParquetSchemaUtil.applyNameMapping(originalMessageType, mapping);
+ FileMetaData oldFileMeta = footer.getFileMetaData();
+ FileMetaData newFileMeta =
+ new FileMetaData(
+ originalMessageType, oldFileMeta.getKeyValueMetaData(),
oldFileMeta.getCreatedBy());
+ return new ParquetMetadata(newFileMeta, footer.getBlocks());
+ }
+
+ static org.apache.parquet.io.InputFile getParquetInputFile(String filePath)
throws IOException {
+ ResourceId resourceId =
+
Iterables.getOnlyElement(FileSystems.match(filePath).metadata()).resourceId();
+ Compression compression =
Compression.detect(checkStateNotNull(resourceId.getFilename()));
+ SeekableByteChannel channel =
+ (SeekableByteChannel)
compression.readDecompressed(FileSystems.open(resourceId));
+ return new BeamParquetInputFile(channel);
+ }
+
+ static class UnknownFormatException extends IllegalArgumentException {}
+
+ static class UnknownPartitionException extends IllegalStateException {
+ UnknownPartitionException(String msg) {
+ super(msg);
+ }
+ }
+}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java
new file mode 100644
index 00000000000..a04853c8ad9
--- /dev/null
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.io.iceberg.AddFiles.ERROR_TAG;
+import static org.apache.beam.sdk.io.iceberg.AddFiles.OUTPUT_TAG;
+import static
org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+@AutoService(SchemaTransformProvider.class)
+public class AddFilesSchemaTransformProvider extends
TypedSchemaTransformProvider<Configuration> {
+ @Override
+ public AddFilesSchemaTransform from(Configuration configuration) {
+ return new AddFilesSchemaTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ return "beam:schematransform:iceberg_add_files:v1";
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Configuration {
+ public static Builder builder() {
+ return new
AutoValue_AddFilesSchemaTransformProvider_Configuration.Builder();
+ }
+
+ @SchemaFieldDescription("A fully-qualified table identifier.")
+ public abstract String getTable();
+
+ @SchemaFieldDescription("Properties used to set up the Iceberg catalog.")
+ public abstract @Nullable Map<String, String> getCatalogProperties();
+
+ @SchemaFieldDescription("Properties passed to the Hadoop ")
+ public abstract @Nullable Map<String, String> getConfigProperties();
+
+ @SchemaFieldDescription(
+ "For a streaming pipeline, sets the frequency at which incoming files
are appended. Defaults to 600 (10 minutes). "
+ + "A commit is triggered when either this or append batch size is
reached.")
+ public abstract @Nullable Integer getTriggeringFrequencySeconds();
+
+ @SchemaFieldDescription(
+ "For a streaming pipeline, sets the desired number of appended files
per commit. Defaults to 100,000 files. "
+ + "A commit is triggered when either this or append triggering
interval is reached.")
+ public abstract @Nullable Integer getAppendBatchSize();
+
+ @SchemaFieldDescription(
+ "The prefix shared among all partitions. For example, a data file may
have the following"
+ + " location:%n"
+ +
"'gs://bucket/namespace/table/data/id=13/name=beam/data_file.parquet'%n%n"
+ + "The provided prefix should go up until the partition
information:%n"
+ + "'gs://bucket/namespace/table/data/'.%n"
+ + "If not provided, will try determining each DataFile's partition
from its metrics metadata.")
+ public abstract @Nullable String getLocationPrefix();
+
+ @SchemaFieldDescription(
+ "Fields used to create a partition spec that is applied when tables
are created. For a field 'foo', "
+ + "the available partition transforms are:\n\n"
+ + "- `foo`\n"
+ + "- `truncate(foo, N)`\n"
+ + "- `bucket(foo, N)`\n"
+ + "- `hour(foo)`\n"
+ + "- `day(foo)`\n"
+ + "- `month(foo)`\n"
+ + "- `year(foo)`\n"
+ + "- `void(foo)`\n\n"
+ + "For more information on partition transforms, please visit
https://iceberg.apache.org/spec/#partition-transforms.")
+ public abstract @Nullable List<String> getPartitionFields();
+
+ @SchemaFieldDescription(
+ "Iceberg table properties to be set on the table when it is created.\n"
+ + "For more information on table properties,"
+ + " please visit
https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
+ public abstract @Nullable Map<String, String> getTableProperties();
+
+ @SchemaFieldDescription("This option specifies whether and where to output
unwritable rows.")
+ public abstract @Nullable ErrorHandling getErrorHandling();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setTable(String table);
+
+ public abstract Builder setCatalogProperties(Map<String, String>
catalogProperties);
+
+ public abstract Builder setConfigProperties(Map<String, String>
confProperties);
+
+ public abstract Builder setTriggeringFrequencySeconds(Integer
triggeringFrequencySeconds);
+
+ public abstract Builder setAppendBatchSize(Integer size);
+
+ public abstract Builder setLocationPrefix(String prefix);
+
+ public abstract Builder setPartitionFields(List<String> fields);
+
+ public abstract Builder setTableProperties(Map<String, String> props);
+
+ public abstract Builder setErrorHandling(ErrorHandling errorHandling);
+
+ public abstract Configuration build();
+ }
+
+ public IcebergCatalogConfig getIcebergCatalog() {
+ return IcebergCatalogConfig.builder()
+ .setCatalogProperties(getCatalogProperties())
+ .setConfigProperties(getConfigProperties())
+ .build();
+ }
+ }
+
+ public static class AddFilesSchemaTransform extends SchemaTransform {
+ private final Configuration configuration;
+
+ public AddFilesSchemaTransform(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ Schema inputSchema = input.getSinglePCollection().getSchema();
+ Preconditions.checkState(
+ inputSchema.getFieldCount() == 1
+ &&
inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING),
+ "Incoming Row Schema must contain only one field of type String.
Instead, got schema: %s",
+ inputSchema);
+
+ @Nullable Integer frequency =
configuration.getTriggeringFrequencySeconds();
+
+ PCollectionRowTuple result =
+ input
+ .getSinglePCollection()
+ .apply("Filter empty paths", Filter.by(row -> row.getString(0)
!= null))
+ .apply(
+ "ExtractPaths",
+ MapElements.into(TypeDescriptors.strings())
+ .via(row -> checkStateNotNull(row.getString(0))))
+ .apply(
+ new AddFiles(
+ configuration.getIcebergCatalog(),
+ configuration.getTable(),
+ configuration.getLocationPrefix(),
+ configuration.getPartitionFields(),
+ configuration.getTableProperties(),
+ configuration.getAppendBatchSize(),
+ frequency != null ? Duration.standardSeconds(frequency)
: null));
+
+ PCollectionRowTuple output = PCollectionRowTuple.of("snapshots",
result.get(OUTPUT_TAG));
+ ErrorHandling errorHandling = configuration.getErrorHandling();
+ if (errorHandling != null) {
+ output = output.and(errorHandling.getOutput(), result.get(ERROR_TAG));
+ }
+ return output;
+ }
+ }
+}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
index 2b3117f8bf8..805cc067294 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java
@@ -68,10 +68,13 @@ class PartitionUtils {
static PartitionSpec toPartitionSpec(
@Nullable List<String> fields, org.apache.beam.sdk.schemas.Schema
beamSchema) {
+ return toPartitionSpec(fields,
IcebergUtils.beamSchemaToIcebergSchema(beamSchema));
+ }
+
+ static PartitionSpec toPartitionSpec(@Nullable List<String> fields, Schema
schema) {
if (fields == null) {
return PartitionSpec.unpartitioned();
}
- Schema schema = IcebergUtils.beamSchemaToIcebergSchema(beamSchema);
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
for (String field : fields) {
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
index a4d95ca249b..e7f50882f43 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
@@ -46,9 +46,11 @@ import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.parquet.ParquetReader;
@@ -112,6 +114,34 @@ public class ReadUtils {
true);
}
+ static ParquetReader<Record> createReader(InputFile inputFile, Schema
schema) {
+ ParquetReadOptions.Builder optionsBuilder;
+ if (inputFile instanceof HadoopInputFile) {
+ // remove read properties already set that may conflict with this read
+ Configuration conf = new Configuration(((HadoopInputFile)
inputFile).getConf());
+ for (String property : READ_PROPERTIES_TO_REMOVE) {
+ conf.unset(property);
+ }
+ optionsBuilder = HadoopReadOptions.builder(conf);
+ } else {
+ optionsBuilder = ParquetReadOptions.builder();
+ }
+ optionsBuilder =
+ optionsBuilder
+ .withRange(0, inputFile.getLength())
+ .withMaxAllocationInBytes(MAX_FILE_BUFFER_SIZE);
+
+ return new ParquetReader<>(
+ inputFile,
+ schema,
+ optionsBuilder.build(),
+ fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema),
+ MappingUtil.create(schema),
+ Expressions.alwaysTrue(),
+ false,
+ true);
+ }
+
static Map<Integer, ?> constantsMap(
FileScanTask task,
BiFunction<Type, Object, Object> converter,
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesIT.java
new file mode 100644
index 00000000000..a67707e3dbb
--- /dev/null
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesIT.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static java.lang.String.format;
+import static org.apache.beam.sdk.io.FileIO.Write.defaultNaming;
+import static
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamSchemaToIcebergSchema;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Notification;
+import com.google.cloud.storage.NotificationInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubOptions;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Deduplicate;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.JsonToRow;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.hadoop.util.Lists;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.util.Pair;
+import org.awaitility.Awaitility;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests for {@link AddFiles}, using Pubsub notifications to pass
newly created file
+ * paths downstream to add to an Iceberg table.
+ */
+public class AddFilesIT {
+ private static final Logger LOG = LoggerFactory.getLogger(AddFilesIT.class);
+
+ private static final String WAREHOUSE = "gs://managed-iceberg-biglake-its";
+ private static final String PROJECT =
+ TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+ @Rule public TestName testName = new TestName();
+ private final String notificationsTopic =
+ format(
+ "projects/%s/topics/%s-%s-%s",
+ PROJECT,
+ AddFilesIT.class.getSimpleName(),
+ testName.getMethodName(),
+ System.currentTimeMillis());
+ private static final Schema NOTIFICATION_SCHEMA =
+ Schema.builder()
+ .addStringField("bucket")
+ .addStringField("name")
+ .addStringField("kind")
+ .build();
+ private static final Map<String, String> BIGLAKE_PROPS =
+ Map.of(
+ "type", "rest",
+ "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
+ "warehouse", WAREHOUSE,
+ "header.x-goog-user-project", PROJECT,
+ "rest.auth.type", "google",
+ "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
+ "rest-metrics-reporting-enabled", "false");
+ private Storage storage;
+ private PubsubClient pubsub;
+ private Notification notification;
+ private final String namespace = getClass().getSimpleName();
+ private String srcTableName;
+ private String destTableName;
+ private TableIdentifier srcTableId;
+ private TableIdentifier destTableId;
+ private long salt;
+ private String dirName;
+ private static final Schema ROW_SCHEMA =
+
Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build();
+ private static final List<String> PARTITION_FIELDS =
+ Arrays.asList("bucket(id, 16)", "truncate(name, 8)", "age");
+ private static final PartitionSpec SPEC =
+ PartitionUtils.toPartitionSpec(PARTITION_FIELDS, ROW_SCHEMA);
+ private static final Map<String, String> TABLE_PROPS =
ImmutableMap.of("foo", "bar");
+ private static final List<Row> TEST_ROWS =
+ IntStream.range(0, 20)
+ .mapToObj(
+ i -> Row.withSchema(ROW_SCHEMA).addValues((long) i, "name_" + i,
i + 30).build())
+ .collect(Collectors.toList());
+ private final RESTCatalog catalog = new RESTCatalog();
+
+ @Before
+ public void setup() throws IOException {
+ storage = StorageOptions.newBuilder().build().getService();
+ pubsub =
+ PubsubGrpcClient.FACTORY.newClient(
+ null, null,
TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class));
+
+ pubsub.createTopic(PubsubClient.topicPathFromPath(notificationsTopic));
+
+ NotificationInfo notificationInfo =
+ NotificationInfo.newBuilder(notificationsTopic)
+ .setEventTypes(NotificationInfo.EventType.OBJECT_FINALIZE)
+ .setPayloadFormat(NotificationInfo.PayloadFormat.JSON_API_V1)
+ .build();
+ try {
+ notification = storage.createNotification(WAREHOUSE.replace("gs://",
""), notificationInfo);
+ } catch (StorageException e) {
+ if (e.getMessage().contains("Too many overlapping notifications")) {
+ List<Notification> existing =
storage.listNotifications(WAREHOUSE.replace("gs://", ""));
+ LOG.warn(
+ "Too many notifications on bucket {}: {}. Deleting existing
notifications to make room: {}",
+ WAREHOUSE,
+ e,
+ existing.stream()
+ .map(NotificationInfo::getNotificationId)
+ .collect(Collectors.toList()));
+ existing.forEach(
+ n -> storage.deleteNotification(WAREHOUSE.replace("gs://", ""),
n.getNotificationId()));
+
+ // try creating it again
+ notification = storage.createNotification(WAREHOUSE.replace("gs://",
""), notificationInfo);
+ } else {
+ throw e;
+ }
+ }
+
+ salt = System.currentTimeMillis();
+ dirName = format("%s-%s/%s", getClass().getSimpleName(), salt,
testName.getMethodName());
+ srcTableName = "src_" + testName.getMethodName() + "_" + salt;
+ destTableName = "dest_" + testName.getMethodName() + "_" + salt;
+ srcTableId = TableIdentifier.of(namespace, srcTableName);
+ destTableId = TableIdentifier.of(namespace, destTableName);
+
+ catalog.initialize("test_catalog", BIGLAKE_PROPS);
+ cleanupCatalog();
+ catalog.createNamespace(Namespace.of(namespace));
+ }
+
+ private void cleanupCatalog() {
+ Namespace ns = Namespace.of(namespace);
+ if (catalog.namespaceExists(ns)) {
+ catalog.listTables(ns).forEach(catalog::dropTable);
+ catalog.dropNamespace(ns);
+ }
+ }
+
+ @After
+ public void cleanup() {
+ try {
+ pubsub.deleteTopic(PubsubClient.topicPathFromPath(notificationsTopic));
+ pubsub.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up PubSub", e);
+ }
+
+ try {
+ storage.deleteNotification(WAREHOUSE.replace("gs://", ""),
notification.getNotificationId());
+ storage.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up GCS notifications", e);
+ }
+
+ try {
+ cleanupCatalog();
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up Iceberg catalog", e);
+ }
+
+ try {
+ Iterable<Blob> blobs =
+ storage
+ .list(WAREHOUSE.replace("gs://", ""),
Storage.BlobListOption.prefix(dirName))
+ .getValues();
+ blobs.forEach(b -> storage.delete(b.getBlobId()));
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up GCS bucket", e);
+ }
+ }
+
+ @Test
+ public void testStreamingImportFromExistingIcebergTable()
+ throws IOException, InterruptedException {
+ // first create a source iceberg table
+ catalog.createTable(srcTableId, beamSchemaToIcebergSchema(ROW_SCHEMA),
SPEC);
+
+ String filter = format("%s/%s/data/", namespace, srcTableName);
+
+ // build AddFiles pipeline and let it run in the background
+ PipelineResult addFilesPipeline = startAddFilesListener(filter);
+
+ // before writing, confirm the destination table still does not exist
+ assertFalse(catalog.tableExists(destTableId));
+
+ // write some rows to the source table
+ LOG.info("Writing records to the source table");
+ Pipeline q = Pipeline.create();
+ q.apply(Create.of(TEST_ROWS))
+ .setRowSchema(ROW_SCHEMA)
+ .apply(
+ Managed.write(Managed.ICEBERG)
+ .withConfig(
+ ImmutableMap.of(
+ "table", srcTableId.toString(), "catalog_properties",
BIGLAKE_PROPS)));
+ q.run().waitUntilFinish();
+
+ // check that the destination table has been created
+ Awaitility.await()
+ .atMost(java.time.Duration.ofMinutes(5))
+ .pollInterval(java.time.Duration.ofSeconds(10))
+ .until(() -> catalog.tableExists(destTableId));
+ LOG.info("Destination table has been created");
+
+ LOG.info("Checking if all source files have been registered in the
destination table");
+ Awaitility.await()
+ .atMost(java.time.Duration.ofMinutes(2))
+ .pollInterval(java.time.Duration.ofSeconds(5))
+ .until(() -> checkTableFiles() != null);
+ LOG.info("Destination table has registered all source files.");
+ Pair<Map<String, DataFile>, Map<String, DataFile>> srcAndDestfiles =
+ checkStateNotNull(checkTableFiles());
+
+ for (Map.Entry<String, DataFile> srcFile :
srcAndDestfiles.first().entrySet()) {
+ String location = srcFile.getKey();
+ DataFile destFile =
+ checkStateNotNull(
+ srcAndDestfiles.second().get(location),
+ "Source file '%s' was not registered in the destination table",
+ location);
+
+ // check that partition metadata was preserved
+ assertEquals(destFile.partition(), srcFile.getValue().partition());
+ }
+
+ // safe to cancel the AddFiles pipeline now
+ LOG.info("Canceling AddFiles listener.");
+ addFilesPipeline.cancel();
+
+ // check all records are there
+ checkRecordsInDestinationTable();
+ }
+
+ /**
+ * Fetch all added files in both tables. Return null if some files have not
yet propagated to
+ * destination table
+ */
+ private @Nullable Pair<Map<String, DataFile>, Map<String, DataFile>>
checkTableFiles() {
+ Table destTable = catalog.loadTable(destTableId);
+ Table srcTable = catalog.loadTable(srcTableId);
+
+ Map<String, DataFile> srcFiles = new HashMap<>();
+ Map<String, DataFile> destFiles = new HashMap<>();
+ for (Snapshot snapshot : srcTable.snapshots()) {
+ snapshot.addedDataFiles(srcTable.io()).forEach(df ->
srcFiles.put(df.location(), df));
+ }
+ for (Snapshot snapshot : destTable.snapshots()) {
+ snapshot.addedDataFiles(destTable.io()).forEach(df ->
destFiles.put(df.location(), df));
+ }
+
+ LOG.info(
+ "Number of source files: {}, Number of registered destination files:
{}",
+ srcFiles.size(),
+ destFiles.size());
+
+ if (srcFiles.size() != destFiles.size()) {
+ Set<String> onlyInSrc = new HashSet<>(srcFiles.keySet());
+ onlyInSrc.removeAll(destFiles.keySet());
+ LOG.info("Missing source files: {}", onlyInSrc);
+ return null;
+ }
+
+ return Pair.of(srcFiles, destFiles);
+ }
+
+ @Test
+ public void testStreamingParquetImport()
+ throws InterruptedException, TimeoutException, IOException {
+ // start with a table that does not exist
+
+ String parquetDir = format("%s/%s/", WAREHOUSE, dirName);
+ String tempDir = format("%s/%s-tmp/", WAREHOUSE, dirName);
+
+ // let the add files pipeline run in the background
+ PipelineResult addFilesPipeline = startAddFilesListener(dirName);
+
+ // before writing, confirm the destination table still does not exist
+ assertFalse(catalog.tableExists(destTableId));
+
+ // write some parquet files
+ LOG.info("Writing records to the parquet dir");
+ Pipeline q = Pipeline.create();
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(ROW_SCHEMA);
+ q.apply(Create.of(TEST_ROWS))
+ .setRowSchema(ROW_SCHEMA)
+ .apply(
+ MapElements.into(TypeDescriptor.of(GenericRecord.class))
+ .via(AvroUtils.getRowToGenericRecordFunction(avroSchema)))
+ .setCoder(AvroCoder.of(avroSchema))
+ .apply(
+ FileIO.<String, GenericRecord>writeDynamic()
+ .by(
+ record ->
+ format("%s-%s-%s", record.get("id"),
record.get("name"), record.get("age")))
+ .via(ParquetIO.sink(avroSchema))
+ .withNaming(name -> defaultNaming(name, ".parquet"))
+ .withTempDirectory(tempDir)
+ .to(parquetDir)
+ .withDestinationCoder(StringUtf8Coder.of()));
+ q.run().waitUntilFinish();
+
+ GcsUtil gcsUtil =
TestPipeline.testingPipelineOptions().as(GcsOptions.class).getGcsUtil();
+
+ Iterable<StorageObject> objects =
+ gcsUtil.listObjects(WAREHOUSE.replace("gs://", ""), dirName,
null).getItems();
+ List<String> writtenFilePaths =
+ Lists.newArrayList(objects).stream()
+ .map(o -> format("gs://%s/%s", o.getBucket(), o.getName()))
+ .collect(Collectors.toList());
+ LOG.info("Written file paths: {}", writtenFilePaths);
+
+ // check that the destination table has been created
+ Awaitility.await()
+ .atMost(java.time.Duration.ofMinutes(1))
+ .pollInterval(java.time.Duration.ofSeconds(5))
+ .until(() -> catalog.tableExists(destTableId));
+ LOG.info("Destination table has been created");
+
+ LOG.info("Checking if all source files have been registered in the
destination table");
+ Awaitility.await()
+ .atMost(java.time.Duration.ofMinutes(2))
+ .pollInterval(java.time.Duration.ofSeconds(5))
+ .until(() -> checkTableHasRegisteredParquetFiles(writtenFilePaths));
+ LOG.info(
+ "Destination table has registered all source files ({} files).",
writtenFilePaths.size());
+
+ // safe to cancel the AddFiles pipeline now
+ LOG.info("Canceling AddFiles listener.");
+ addFilesPipeline.cancel();
+
+ // check all records are there
+ checkRecordsInDestinationTable();
+ }
+
+ @Test
+ public void testBatchParquetImport() throws IOException {
+ // start with a table that does not exist
+
+ String parquetDir = format("%s/%s/", WAREHOUSE, dirName);
+ String tempDir = format("%s/%s-tmp/", WAREHOUSE, dirName);
+
+ // write some parquet files
+ LOG.info("Writing records to the parquet dir");
+ Pipeline q = Pipeline.create();
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(ROW_SCHEMA);
+ q.apply(Create.of(TEST_ROWS))
+ .setRowSchema(ROW_SCHEMA)
+ .apply(
+ MapElements.into(TypeDescriptor.of(GenericRecord.class))
+ .via(AvroUtils.getRowToGenericRecordFunction(avroSchema)))
+ .setCoder(AvroCoder.of(avroSchema))
+ .apply(
+ FileIO.<String, GenericRecord>writeDynamic()
+ .by(
+ record ->
+ format("%s-%s-%s", record.get("id"),
record.get("name"), record.get("age")))
+ .via(ParquetIO.sink(avroSchema))
+ .withNaming(name -> defaultNaming(name, ".parquet"))
+ .withTempDirectory(tempDir)
+ .to(parquetDir)
+ .withDestinationCoder(StringUtf8Coder.of()));
+ q.run().waitUntilFinish();
+
+ GcsUtil gcsUtil =
TestPipeline.testingPipelineOptions().as(GcsOptions.class).getGcsUtil();
+
+ Iterable<StorageObject> objects =
+ gcsUtil.listObjects(WAREHOUSE.replace("gs://", ""), dirName,
null).getItems();
+ List<String> writtenFilePaths =
+ Lists.newArrayList(objects).stream()
+ .map(o -> format("gs://%s/%s", o.getBucket(), o.getName()))
+ .collect(Collectors.toList());
+ LOG.info("Written file paths: {}", writtenFilePaths);
+
+ // before adding, confirm the destination table still does not exist
+ assertFalse(catalog.tableExists(destTableId));
+
+ // run batch AddFiles
+ Pipeline p = Pipeline.create();
+ PCollectionRowTuple tuple =
+ p.apply(Create.of(writtenFilePaths))
+ .apply(
+ new AddFiles(
+
IcebergCatalogConfig.builder().setCatalogProperties(BIGLAKE_PROPS).build(),
+ namespace + "." + destTableName,
+ null,
+ PARTITION_FIELDS,
+ TABLE_PROPS,
+ 10,
+ Duration.standardSeconds(10)));
+ PAssert.that(tuple.get("errors")).empty();
+ p.run().waitUntilFinish();
+
+ // check that the destination table has been created
+ assertTrue(catalog.tableExists(destTableId));
+ LOG.info("Destination table has been created");
+
+ LOG.info("Checking if all source files have been registered in the
destination table");
+ assertTrue(checkTableHasRegisteredParquetFiles(writtenFilePaths));
+ LOG.info(
+ "Destination table has registered all source files ({} files).",
writtenFilePaths.size());
+
+ // check all records are there
+ checkRecordsInDestinationTable();
+ }
+
+ private void checkRecordsInDestinationTable() {
+ Pipeline s = Pipeline.create();
+ PCollection<Row> destRows =
+ s.apply(
+ Managed.read(Managed.ICEBERG)
+ .withConfig(
+ ImmutableMap.of(
+ "table", destTableId.toString(),
"catalog_properties", BIGLAKE_PROPS)))
+ .getSinglePCollection();
+ PAssert.that(destRows).containsInAnyOrder(TEST_ROWS);
+ s.run().waitUntilFinish();
+ }
+
+ private boolean checkTableHasRegisteredParquetFiles(List<String>
parquetFiles) {
+ Table destTable = catalog.loadTable(destTableId);
+
+ int numRegisteredFiles = 0;
+ for (Snapshot snapshot : destTable.snapshots()) {
+ numRegisteredFiles +=
Iterables.size(snapshot.addedDataFiles(destTable.io()));
+ }
+ LOG.info(
+ "Number of source files: {}, Number of registered destination files:
{}",
+ parquetFiles.size(),
+ numRegisteredFiles);
+ return numRegisteredFiles == parquetFiles.size();
+ }
+
+ private PipelineResult startAddFilesListener(String filter) throws
InterruptedException {
+ DirectOptions options =
TestPipeline.testingPipelineOptions().as(DirectOptions.class);
+ options.setBlockOnRun(false);
+ Pipeline p = Pipeline.create(options);
+
+ PCollectionRowTuple tuple =
+ p.apply(PubsubIO.readStrings().fromTopic(notificationsTopic))
+ .apply(JsonToRow.withSchema(NOTIFICATION_SCHEMA))
+ .apply(Filter.by(row -> row.getString("name").contains(filter)))
+ .apply(
+ MapElements.into(strings())
+ .via(
+ row ->
+ format("gs://%s/%s", row.getString("bucket"),
row.getString("name"))))
+ .apply(Deduplicate.values())
+ .apply(
+ new AddFiles(
+
IcebergCatalogConfig.builder().setCatalogProperties(BIGLAKE_PROPS).build(),
+ namespace + "." + destTableName,
+ null,
+ PARTITION_FIELDS,
+ TABLE_PROPS,
+ 10,
+ Duration.standardSeconds(10)));
+ PAssert.that(tuple.get("errors")).empty();
+ PipelineResult result = p.run();
+
+ LOG.info(
+ "Started running the AddFiles listener pipeline. Waiting for 10s to
allow it enough time to setup");
+ Thread.sleep(10_000);
+ return result;
+ }
+}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
new file mode 100644
index 00000000000..287b9140e00
--- /dev/null
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static
org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.PREFIX_ERROR;
+import static
org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.UNKNOWN_PARTITION_ERROR;
+import static
org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.getPartitionFromMetrics;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SerializableFunction;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class AddFilesTest {
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+ private String root;
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ private HadoopCatalog catalog;
+ private TableIdentifier tableId;
+ private final org.apache.iceberg.Schema icebergSchema =
+ new org.apache.iceberg.Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "name", Types.StringType.get()),
+ Types.NestedField.required(3, "age", Types.IntegerType.get()));
+ private final List<String> partitionFields = Arrays.asList("age",
"truncate(name, 3)");
+ private final PartitionSpec spec =
PartitionUtils.toPartitionSpec(partitionFields, icebergSchema);
+ private final PartitionKey wrapper = new PartitionKey(spec, icebergSchema);
+ private final Map<String, String> tableProps =
+ ImmutableMap.of("write.metadata.metrics.default", "full", "foo", "bar");
+ private IcebergCatalogConfig catalogConfig;
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+ @Rule public TestName testName = new TestName();
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Rule
+ public transient TestDataWarehouse warehouse = new
TestDataWarehouse(TEMPORARY_FOLDER, "default");
+
+ @Before
+ public void setup() throws Exception {
+ // Root for existing data files:
+ root = temp.getRoot().getAbsolutePath() + "/";
+
+ // Set up a local Hadoop Catalog
+ catalog = new HadoopCatalog(new Configuration(), warehouse.location);
+ tableId = TableIdentifier.of("default", testName.getMethodName());
+
+ catalogConfig =
+ IcebergCatalogConfig.builder()
+ .setCatalogProperties(
+ ImmutableMap.of("type", "hadoop", "warehouse",
warehouse.location))
+ .build();
+ }
+
+ @Test
+ public void testAddPartitionedFiles() throws Exception {
+ testAddFilesWithPartitionPath(true);
+ }
+
+ @Test
+ public void testAddUnPartitionedFiles() throws Exception {
+ testAddFilesWithPartitionPath(false);
+ }
+
+ public void testAddFilesWithPartitionPath(boolean isPartitioned) throws
Exception {
+ // 1. Generate two local Parquet file.
+ // Include Hive-like partition path if testing partition case
+ String partitionPath1 = isPartitioned ? "age=20/name_trunc=Mar/" : "";
+ String file1 = root + partitionPath1 + "data1.parquet";
+ wrapper.wrap(record(-1, "Mar", 20));
+ DataWriter<Record> writer = createWriter(file1, isPartitioned ?
wrapper.copy() : null);
+ writer.write(record(1, "Mark", 20));
+ writer.write(record(2, "Martin", 20));
+ writer.close();
+
+ String partitionPath2 = isPartitioned ? "age=25/name_trunc=Sam/" : "";
+ String file2 = root + partitionPath2 + "data2.parquet";
+ wrapper.wrap(record(-1, "Sam", 25));
+ DataWriter<Record> writer2 = createWriter(file2, isPartitioned ?
wrapper.copy() : null);
+ writer2.write(record(3, "Samantha", 25));
+ writer2.write(record(4, "Sammy", 25));
+ writer2.close();
+
+ // 2. Setup the input PCollection
+ PCollection<String> inputFiles = pipeline.apply("Create Input",
Create.of(file1, file2));
+
+ // 3. Apply the transform (Trigger aggressively for testing)
+ PCollectionRowTuple output =
+ inputFiles.apply(
+ new AddFiles(
+ catalogConfig,
+ tableId.toString(),
+ isPartitioned ? root : null,
+ isPartitioned ? partitionFields : null,
+ tableProps,
+ 2, // trigger at 2 files
+ Duration.standardSeconds(10)));
+
+ // 4. Validate PCollection Outputs
+ PAssert.that(output.get("errors")).empty();
+
+ // 5. Run the pipeline
+ pipeline.run().waitUntilFinish();
+
+ // 6. Validate the Iceberg Table was created with the correct spec and
properties
+ Table table = catalog.loadTable(tableId);
+ tableProps.forEach((key, value) -> assertThat(table.properties(),
hasEntry(key, value)));
+ assertEquals(isPartitioned ? spec : PartitionSpec.unpartitioned(),
table.spec());
+
+ // Check that we have exactly 1 snapshot with 2 files
+ assertEquals(1, Iterables.size(table.snapshots()));
+
+ List<DataFile> addedFiles =
+ Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io()));
+ assertEquals(2, addedFiles.size());
+
+ // Verify file paths
+ assertTrue(addedFiles.stream().anyMatch(df ->
df.location().contains("data1.parquet")));
+ assertTrue(addedFiles.stream().anyMatch(df ->
df.location().contains("data2.parquet")));
+
+ // check metrics metadata is preserved
+ DataFile writtenDf1 = writer.toDataFile();
+ DataFile writtenDf2 = writer2.toDataFile();
+ DataFile addedDf1 =
+ Iterables.getOnlyElement(
+ addedFiles.stream()
+ .filter(df -> df.location().contains("data1.parquet"))
+ .collect(Collectors.toList()));
+ DataFile addedDf2 =
+ Iterables.getOnlyElement(
+ addedFiles.stream()
+ .filter(df -> df.location().contains("data2.parquet"))
+ .collect(Collectors.toList()));
+
+ assertEquals(writtenDf1.lowerBounds(), addedDf1.lowerBounds());
+ assertEquals(writtenDf1.upperBounds(), addedDf1.upperBounds());
+ assertEquals(writtenDf2.lowerBounds(), addedDf2.lowerBounds());
+ assertEquals(writtenDf2.upperBounds(), addedDf2.upperBounds());
+
+ // check partition metadata is preserved
+ assertEquals(writtenDf1.partition(), addedDf1.partition());
+ assertEquals(writtenDf2.partition(), addedDf2.partition());
+ }
+
+ @Test
+ public void testAddFilesWithPartitionFromMetrics() throws IOException {
+ // 1. Generate local Parquet files with no directory structure.
+ String file1 = root + "data1.parquet";
+ DataWriter<Record> writer = createWriter(file1);
+ writer.write(record(1, "Mark", 20));
+ writer.write(record(2, "Martin", 20));
+ writer.close();
+ PartitionData expectedPartition1 = new PartitionData(spec.partitionType());
+ expectedPartition1.set(0, 20);
+ expectedPartition1.set(1, "Mar");
+
+ String file2 = root + "data2.parquet";
+ DataWriter<Record> writer2 = createWriter(file2);
+ writer2.write(record(3, "Samantha", 25));
+ writer2.write(record(4, "Sammy", 25));
+ writer2.close();
+ PartitionData expectedPartition2 = new PartitionData(spec.partitionType());
+ expectedPartition2.set(0, 25);
+ expectedPartition2.set(1, "Sam");
+
+ // Also create a "bad" DataFile, containing values that correspond to
different partitions
+ // This file should get output to the DLQ, because we cannot determine its
partition
+ String file3 = root + "data3.parquet";
+ DataWriter<Record> writer3 = createWriter(file3);
+ writer3.write(record(5, "Johnny", 25));
+ writer3.write(record(6, "Yaseen", 32));
+ writer3.close();
+
+ // 2. Setup the input PCollection
+ PCollection<String> inputFiles = pipeline.apply("Create Input",
Create.of(file1, file2, file3));
+
+ // 3. Apply the transform (Trigger aggressively for testing)
+ PCollectionRowTuple output =
+ inputFiles.apply(
+ new AddFiles(
+ catalogConfig,
+ tableId.toString(),
+ null, // no prefix, so determine partition from DF metrics
+ partitionFields,
+ tableProps,
+ 2, // trigger at 2 files
+ Duration.standardSeconds(10)));
+
+ // 4. There should be an error for File3, because its partition could not
be determined
+ PAssert.that(output.get("errors"))
+ .satisfies(
+ errorRows -> {
+ Row errorRow = Iterables.getOnlyElement(errorRows);
+ checkState(
+ errorRow.getSchema().equals(AddFiles.ERROR_SCHEMA)
+ && file3.equals(errorRow.getString(0))
+ && checkStateNotNull(errorRow.getString(1))
+ .startsWith(UNKNOWN_PARTITION_ERROR));
+ return null;
+ });
+
+ // 5. Run the pipeline
+ pipeline.run().waitUntilFinish();
+
+ // 6. Validate the Iceberg Table was created with the correct spec and
properties
+ Table table = catalog.loadTable(tableId);
+ tableProps.forEach((key, value) -> assertThat(table.properties(),
hasEntry(key, value)));
+ assertEquals(spec, table.spec());
+
+ // Check that we have exactly 1 snapshot with 2 files
+ assertEquals(1, Iterables.size(table.snapshots()));
+
+ List<DataFile> addedFiles =
+ Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io()));
+ assertEquals(2, addedFiles.size());
+
+ // Verify file paths
+ assertTrue(addedFiles.stream().anyMatch(df ->
df.location().contains("data1.parquet")));
+ assertTrue(addedFiles.stream().anyMatch(df ->
df.location().contains("data2.parquet")));
+
+ // check metrics metadata is preserved
+ DataFile writtenDf1 = writer.toDataFile();
+ DataFile writtenDf2 = writer2.toDataFile();
+ DataFile addedDf1 =
+ Iterables.getOnlyElement(
+ addedFiles.stream()
+ .filter(df -> df.location().contains("data1.parquet"))
+ .collect(Collectors.toList()));
+ DataFile addedDf2 =
+ Iterables.getOnlyElement(
+ addedFiles.stream()
+ .filter(df -> df.location().contains("data2.parquet"))
+ .collect(Collectors.toList()));
+
+ assertEquals(writtenDf1.lowerBounds(), addedDf1.lowerBounds());
+ assertEquals(writtenDf1.upperBounds(), addedDf1.upperBounds());
+ assertEquals(writtenDf2.lowerBounds(), addedDf2.lowerBounds());
+ assertEquals(writtenDf2.upperBounds(), addedDf2.upperBounds());
+
+ // check partition metadata is preserved
+ assertEquals(expectedPartition1, addedDf1.partition());
+ assertEquals(expectedPartition2, addedDf2.partition());
+ }
+
+ @Test
+ public void testStreamingAdds() throws IOException {
+ List<String> paths = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ String file = String.format("%sdata_%s.parquet", root, i);
+ DataWriter<Record> writer = createWriter(file);
+ writer.write(record(1, "SomeName", 30));
+ writer.close();
+ paths.add(file);
+ }
+
+ PCollection<String> files =
+ pipeline.apply(
+ TestStream.create(StringUtf8Coder.of())
+ .addElements(
+ paths.get(0),
+ paths.subList(1, 15).toArray(new String[] {})) // should
commit twice
+ .advanceProcessingTime(Duration.standardSeconds(10))
+ .addElements(
+ paths.get(15),
+ paths.subList(16, 40).toArray(new String[] {})) // should
commit 3 times
+ .advanceProcessingTime(Duration.standardSeconds(10))
+ .addElements(
+ paths.get(40),
+ paths.subList(41, 45).toArray(new String[] {})) // should
commit once
+ .advanceWatermarkToInfinity());
+
+ files.apply(
+ new AddFiles(
+ catalogConfig,
+ tableId.toString(),
+ null,
+ null,
+ null,
+ 10, // trigger at 10 files
+ Duration.standardSeconds(5)));
+ pipeline.run().waitUntilFinish();
+
+ Table table = catalog.loadTable(tableId);
+
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ snapshots.sort(Comparator.comparingLong(Snapshot::timestampMillis));
+
+ assertEquals(6, snapshots.size());
+ assertEquals(10,
Iterables.size(snapshots.get(0).addedDataFiles(table.io())));
+ assertEquals(5,
Iterables.size(snapshots.get(1).addedDataFiles(table.io())));
+ assertEquals(10,
Iterables.size(snapshots.get(2).addedDataFiles(table.io())));
+ assertEquals(10,
Iterables.size(snapshots.get(3).addedDataFiles(table.io())));
+ assertEquals(5,
Iterables.size(snapshots.get(4).addedDataFiles(table.io())));
+ assertEquals(5,
Iterables.size(snapshots.get(5).addedDataFiles(table.io())));
+ }
+
+ @Test
+ public void testUnknownFormatErrors() throws Exception {
+ catalog.createTable(tableId, icebergSchema);
+ // Create a dummy text file (unsupported extension)
+ File txtFile = temp.newFile("unsupported.txt");
+ txtFile.createNewFile();
+
+ PCollection<String> inputFiles =
+ pipeline.apply("Create Input", Create.of(txtFile.getAbsolutePath()));
+
+ AddFiles addFiles = new AddFiles(catalogConfig, tableId.toString(), null,
null, null, 1, null);
+ PCollectionRowTuple outputTuple = inputFiles.apply(addFiles);
+
+ // Validate the file ended up in the errors PCollection with the correct
schema
+ PAssert.that(outputTuple.get("errors"))
+ .containsInAnyOrder(
+ Row.withSchema(AddFiles.ERROR_SCHEMA)
+ .addValues(txtFile.getAbsolutePath(), "Could not determine the
file's format")
+ .build());
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testPartitionPrefixErrors() throws Exception {
+ // Drop unpartitioned table and create a partitioned one
+ catalog.dropTable(tableId);
+ PartitionSpec spec =
PartitionSpec.builderFor(icebergSchema).identity("name").build();
+ catalog.createTable(tableId, icebergSchema, spec);
+
+ String file1 = root + "data1.parquet";
+ wrapper.wrap(record(-1, "And", 30));
+ DataWriter<Record> writer = createWriter(file1, wrapper.copy());
+ writer.write(record(1, "Andrew", 30));
+ writer.close();
+
+ PCollection<String> inputFiles = pipeline.apply("Create Input",
Create.of(file1));
+
+ // Notice locationPrefix is "some/prefix/" but the absolute path doesn't
start with it
+ AddFiles addFiles =
+ new AddFiles(catalogConfig, tableId.toString(), "some/prefix/", null,
null, 1, null);
+ PCollectionRowTuple outputTuple = inputFiles.apply(addFiles);
+
+ PAssert.that(outputTuple.get("errors"))
+ .containsInAnyOrder(
+ Row.withSchema(AddFiles.ERROR_SCHEMA).addValues(file1,
PREFIX_ERROR).build());
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testRecognizesBucketPartitionMismatch() throws IOException {
+ catalog.dropTable(tableId);
+
+ String file1 = root + "data1.parquet";
+ wrapper.wrap(record(-1, "And", 30));
+ DataWriter<Record> writer = createWriter(file1, wrapper.copy());
+ writer.write(record(1, "Andrew", 30));
+ writer.write(record(5, "Sally", 30));
+ writer.write(record(10, "Ahmed", 30));
+ writer.close();
+
+ // 1 (min) and 10 (max) will transform to bucket=0
+ // 5 (some middle value) transforms to bucket=1
+ // To prove this transform value mapping^, below is a sanity check.
+ // We should recognize that we cannot assign a partition to such a file,
and pass it to DLQ.
+ List<String> partitionFields = Arrays.asList("bucket(id, 2)", "age");
+ PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields,
icebergSchema);
+ PartitionField bucketPartition = spec.fields().get(0);
+ assertEquals("id_bucket", bucketPartition.name());
+ assertTrue(bucketPartition.transform().toString().contains("bucket["));
+ SerializableFunction<Long, Integer> transformFunc =
+ (SerializableFunction<Long, Integer>)
+ bucketPartition.transform().bind(Types.LongType.get());
+ assertEquals(0, (int) transformFunc.apply(1L));
+ assertEquals(1, (int) transformFunc.apply(5L));
+ assertEquals(0, (int) transformFunc.apply(10L));
+
+ AddFiles addFiles =
+ new AddFiles(catalogConfig, tableId.toString(), null, partitionFields,
null, 1, null);
+ PCollection<String> inputFiles = pipeline.apply("Create Input",
Create.of(file1));
+ PCollectionRowTuple outputTuple = inputFiles.apply(addFiles);
+
+ PAssert.that(outputTuple.get("errors"))
+ .containsInAnyOrder(
+ Row.withSchema(AddFiles.ERROR_SCHEMA)
+ .addValues(
+ file1,
+ UNKNOWN_PARTITION_ERROR
+ + "Found records with conflicting transformed values,
for column: id")
+ .build());
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testGetPartitionFromMetrics() throws IOException,
InterruptedException {
+ PartitionSpec partitionSpec =
+ PartitionSpec.builderFor(icebergSchema)
+ .bucket("id", 2)
+ .truncate("name", 4)
+ .identity("age")
+ .build();
+
+ List<PartitionTestCase> testCases =
+ Arrays.asList(
+ PartitionTestCase.of(
+ root + "data_1.parquet",
+ record(1, "aaaa", 10),
+ Arrays.asList(
+ record(1, "aaaa123", 10),
+ record(10, "aaaa789", 10),
+ record(100, "aaaa456", 10)),
+ Arrays.asList(1, CharBuffer.wrap("aaaa123"), 10),
+ Arrays.asList(100, CharBuffer.wrap("aaaa789"), 10),
+ "id_bucket=0/name_trunc=aaaa/age=10"),
+ PartitionTestCase.of(
+ root + "data_2.parquet",
+ record(1, "bbbb", 30),
+ Arrays.asList(
+ record(5, "bbbb789", 30),
+ record(55, "bbbb456", 30),
+ record(500, "bbbb123", 30)),
+ Arrays.asList(5, CharBuffer.wrap("bbbb123"), 30),
+ Arrays.asList(500, CharBuffer.wrap("bbbb789"), 30),
+ "id_bucket=1/name_trunc=bbbb/age=30"));
+
+ PartitionKey pk = new PartitionKey(partitionSpec, icebergSchema);
+ MetricsConfig metricsConfig = MetricsConfig.fromProperties(tableProps);
+ Table table = catalog.createTable(tableId, icebergSchema, partitionSpec);
+
+ for (PartitionTestCase caze : testCases) {
+ List<Record> records = caze.records;
+ String fileName = caze.fileName;
+ pk.wrap(caze.partition);
+ DataWriter<Record> writer = createWriter(fileName, pk.copy());
+
+ for (Record record : records) {
+ writer.write(record);
+ }
+ writer.close();
+ InputFile file = table.io().newInputFile(fileName);
+
+ Metrics metrics =
+ AddFiles.getFileMetrics(
+ file, FileFormat.PARQUET, metricsConfig,
MappingUtil.create(icebergSchema));
+ for (int i = 0; i < partitionSpec.fields().size(); i++) {
+ PartitionField partitionField = partitionSpec.fields().get(i);
+ Types.NestedField field =
icebergSchema.findField(partitionField.sourceId());
+ ByteBuffer lowerBytes = metrics.lowerBounds().get(field.fieldId());
+ ByteBuffer upperBytes = metrics.upperBounds().get(field.fieldId());
+
+ Object lower = Conversions.fromByteBuffer(field.type(), lowerBytes);
+ Object upper = Conversions.fromByteBuffer(field.type(), upperBytes);
+
+ assertEquals(caze.expectedLower.get(i), lower);
+ assertEquals(caze.expectedUpper.get(i), upper);
+ }
+
+ String partitionPath = getPartitionFromMetrics(metrics, file, table);
+ assertEquals(caze.expectedPartition, partitionPath);
+ }
+ }
+
+ @Test
+ public void testThrowPartitionMismatchError() throws IOException,
InterruptedException {
+ PartitionSpec partitionSpec =
+ PartitionSpec.builderFor(icebergSchema)
+ .bucket("id", 2)
+ .truncate("name", 4)
+ .identity("age")
+ .build();
+
+ List<PartitionTestCase> testCases =
+ Arrays.asList(
+ PartitionTestCase.of(
+ root + "data_1.parquet",
+ record(1, "aaaa", 10),
+ Arrays.asList(
+ record(1, "aaaa123", 10), record(10, "abab", 10),
record(100, "aaaa789", 10)),
+ Arrays.asList(1, CharBuffer.wrap("aaaa123"), 10),
+ Arrays.asList(100, CharBuffer.wrap("abab"), 10),
+ "error"),
+ PartitionTestCase.of(
+ root + "data_2.parquet",
+ record(1, "bbbb", 30),
+ Arrays.asList(
+ record(5, "bbbb", 30), record(55, "bbbb", 30), record(500,
"bbbb", 50)),
+ Arrays.asList(5, CharBuffer.wrap("bbbb"), 30),
+ Arrays.asList(500, CharBuffer.wrap("bbbb"), 50),
+ "error"));
+
+ PartitionKey pk = new PartitionKey(partitionSpec, icebergSchema);
+ MetricsConfig metricsConfig = MetricsConfig.fromProperties(tableProps);
+ Table table = catalog.createTable(tableId, icebergSchema, partitionSpec);
+
+ for (PartitionTestCase caze : testCases) {
+ List<Record> records = caze.records;
+ String fileName = caze.fileName;
+ pk.wrap(caze.partition);
+ DataWriter<Record> writer = createWriter(fileName, pk.copy());
+
+ for (Record record : records) {
+ writer.write(record);
+ }
+ writer.close();
+ InputFile file = table.io().newInputFile(fileName);
+
+ Metrics metrics =
+ AddFiles.getFileMetrics(
+ file, FileFormat.PARQUET, metricsConfig,
MappingUtil.create(icebergSchema));
+ // check that lower/upper stats are still fetched correctly
+ for (int i = 0; i < partitionSpec.fields().size(); i++) {
+ PartitionField partitionField = partitionSpec.fields().get(i);
+ Types.NestedField field =
icebergSchema.findField(partitionField.sourceId());
+ ByteBuffer lowerBytes = metrics.lowerBounds().get(field.fieldId());
+ ByteBuffer upperBytes = metrics.upperBounds().get(field.fieldId());
+
+ Object lower = Conversions.fromByteBuffer(field.type(), lowerBytes);
+ Object upper = Conversions.fromByteBuffer(field.type(), upperBytes);
+
+ assertEquals(caze.expectedLower.get(i), lower);
+ assertEquals(caze.expectedUpper.get(i), upper);
+ }
+
+ assertThrows(
+ AddFiles.UnknownPartitionException.class,
+ () -> getPartitionFromMetrics(metrics, file, table));
+ }
+ }
+
+ static class PartitionTestCase {
+ String fileName;
+ StructLike partition;
+ List<Record> records;
+ List<Object> expectedLower;
+ List<Object> expectedUpper;
+ String expectedPartition;
+
+ PartitionTestCase(
+ String fileName,
+ StructLike partition,
+ List<Record> records,
+ List<Object> expectedLower,
+ List<Object> expectedUpper,
+ String expectedPartition) {
+ this.fileName = fileName;
+ this.partition = partition;
+ this.records = records;
+ this.expectedLower = expectedLower;
+ this.expectedUpper = expectedUpper;
+ this.expectedPartition = expectedPartition;
+ }
+
+ static PartitionTestCase of(
+ String fileName,
+ StructLike partition,
+ List<Record> records,
+ List<Object> expectedLower,
+ List<Object> expectedUpper,
+ String expectedPartition) {
+ return new PartitionTestCase(
+ fileName, partition, records, expectedLower, expectedUpper,
expectedPartition);
+ }
+ }
+
+ private DataWriter<Record> createWriter(String file) throws IOException {
+ return createWriter(file, null);
+ }
+
+ private DataWriter<Record> createWriter(String file, @Nullable StructLike
partition)
+ throws IOException {
+ return Parquet.writeData(Files.localOutput(file))
+ .schema(icebergSchema)
+ .withSpec(partition != null ? spec : PartitionSpec.unpartitioned())
+ .withPartition(partition)
+ .createWriterFunc(GenericParquetWriter::create)
+ .build();
+ }
+
+ private Record record(int id, String name, int age) {
+ return GenericRecord.create(icebergSchema).copy("id", id, "name", name,
"age", age);
+ }
+}
diff --git
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
index e6e4e27d74f..d365cd5f7c7 100644
---
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
+++
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -971,10 +971,10 @@ public class ParquetIO {
}
}
- private static class BeamParquetInputFile implements InputFile {
+ public static class BeamParquetInputFile implements InputFile {
private final SeekableByteChannel seekableByteChannel;
- BeamParquetInputFile(SeekableByteChannel seekableByteChannel) {
+ public BeamParquetInputFile(SeekableByteChannel seekableByteChannel) {
this.seekableByteChannel = seekableByteChannel;
}