FrankChen021 commented on code in PR #19266: URL: https://github.com/apache/druid/pull/19266#discussion_r3324570062
########## extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergFileTaskInputSource.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceFactory; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.hadoop.conf.Configuration; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +/** + * A {@link SplittableInputSource} representing a single Iceberg v2 data file with its + * associated delete files. Created internally by {@link IcebergInputSource} when v2 + * delete files are detected. Reports itself as non-splittable (one file = one split) + * so MSQ routes it to a single worker without further splitting. + * + * This input source is JSON-serializable, carrying all metadata needed for a worker + * to read a data file and apply deletes without catalog access: + * <ul> + * <li>Data file path</li> + * <li>Delete file metadata (paths, types, equality field IDs)</li> + * <li>Serialized Iceberg table schema (JSON)</li> + * <li>warehouseSource for file I/O</li> + * </ul> + */ +public class IcebergFileTaskInputSource implements SplittableInputSource<List<String>> +{ + public static final String TYPE_KEY = "icebergFileTask"; + + private final String dataFilePath; + private final List<DeleteFileInfo> deleteFiles; + private final String tableSchemaJson; + private final InputSourceFactory warehouseSource; + private final String fileIOImpl; + private final Map<String, String> fileIOProperties; + // TODO https://github.com/apache/druid/issues/19472: extend to ORC/AVRO once iceberg-orc/iceberg-avro deps are added + private final String fileFormat; + private final Configuration hadoopConf; + + @JsonCreator + public IcebergFileTaskInputSource( + @JsonProperty("dataFilePath") final String dataFilePath, + @JsonProperty("deleteFiles") final List<DeleteFileInfo> deleteFiles, + @JsonProperty("tableSchemaJson") final String tableSchemaJson, + @JsonProperty("warehouseSource") final InputSourceFactory warehouseSource, + @JsonProperty("fileIOImpl") @Nullable final String fileIOImpl, + @JsonProperty("fileIOProperties") @Nullable final Map<String, String> fileIOProperties, + @JsonProperty("fileFormat") @Nullable final String fileFormat, + final Configuration hadoopConf Review Comment: [P1] Unbound Configuration breaks v2 split deserialization IcebergFileTaskInputSource is registered as a Jackson subtype and returned from withSplit for v2 tables, so workers need to deserialize it. The @JsonCreator leaves hadoopConf as an unannotated constructor parameter, unlike the catalog classes that use @JacksonInject @HiveConf, so Druid's ObjectMapper has no property or injectable value to bind. This can make v2 delete splits fail before reading. Inject it, annotate it, or create a safe default, and add a Jackson round-trip test for this input source. ########## extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergNativeRecordReader.java: ########## @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceFactory; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * An {@link InputSourceReader} that reads an Iceberg data file and applies + * associated positional and equality delete files before converting records + * to Druid {@link InputRow} objects. + * + * Delete application follows the Iceberg v2 spec: + * <ol> + * <li>Positional deletes: read (file_path, pos) pairs, filter by current data file, + * build a Set of deleted positions</li> + * <li>Equality deletes: read key tuples from equality delete files, build Sets + * of deleted key values per equality field set</li> + * <li>Stream data file: for each record, skip if position-deleted or equality-deleted</li> + * </ol> + * + * Only Parquet file format is supported. ORC and Avro require additional dependencies; + * see TODO https://github.com/apache/druid/issues/19472. + */ +public class IcebergNativeRecordReader implements InputSourceReader +{ + private static final Logger log = new Logger(IcebergNativeRecordReader.class); + + private final String dataFilePath; + private final List<DeleteFileInfo> deleteFiles; + private final String tableSchemaJson; + private final InputSourceFactory warehouseSource; + private final InputRowSchema inputRowSchema; + private final Configuration hadoopConf; + private final FileIO fileIO; + private final String fileFormat; + + public IcebergNativeRecordReader( + final String dataFilePath, + final List<DeleteFileInfo> deleteFiles, + final String tableSchemaJson, + final InputSourceFactory warehouseSource, + final InputRowSchema inputRowSchema, + @Nullable final String fileIOImpl, + @Nullable final Map<String, String> fileIOProperties, + final String fileFormat, + final Configuration hadoopConf + ) + { + this.dataFilePath = dataFilePath; + this.deleteFiles = deleteFiles; + this.tableSchemaJson = tableSchemaJson; + this.warehouseSource = warehouseSource; + this.inputRowSchema = inputRowSchema; + this.hadoopConf = hadoopConf; + this.fileIO = buildFileIO(fileIOImpl, fileIOProperties, hadoopConf); + this.fileFormat = fileFormat; + } + + private void requireParquet(final String filePath) + { + if (!"PARQUET".equalsIgnoreCase(fileFormat)) { + throw new UnsupportedOperationException( + "Iceberg file format [" + fileFormat + "] is not supported for file [" + filePath + + "]. Only PARQUET is currently supported. See https://github.com/apache/druid/issues/19472" + ); + } + } + + private void requireParquetDeleteFile(final DeleteFileInfo deleteFileInfo) + { + if (!"PARQUET".equalsIgnoreCase(deleteFileInfo.getFormat())) { + throw new UnsupportedOperationException( + "Iceberg delete file format [" + deleteFileInfo.getFormat() + "] is not supported for file [" + + deleteFileInfo.getPath() + "]. Only PARQUET is currently supported. " + + "See https://github.com/apache/druid/issues/19472" + ); + } + } + + private static FileIO buildFileIO( + @Nullable final String fileIOImpl, + @Nullable final Map<String, String> fileIOProperties, + final Configuration hadoopConf + ) + { + final Map<String, String> props = fileIOProperties == null ? Collections.emptyMap() : fileIOProperties; + if (fileIOImpl == null || fileIOImpl.isEmpty()) { + return new HadoopFileIO(hadoopConf); + } + return CatalogUtil.loadFileIO(fileIOImpl, props, hadoopConf); + } + + @Override + public CloseableIterator<InputRow> read(final InputStats inputStats) throws IOException + { + final Schema tableSchema = SchemaParser.fromJson(tableSchemaJson); + + // Step 1: Collect positional deletes + final Set<Long> deletedPositions = collectPositionalDeletes(); + + // Step 2: Collect equality deletes + final List<EqualityDeleteSet> equalityDeleteSets = collectEqualityDeletes(tableSchema); + + // Step 3: Stream data file with delete application + requireParquet(dataFilePath); + final InputFile dataInputFile = fileIO.newInputFile(dataFilePath); Review Comment: [P1] V2 path bypasses warehouseSource file access When delete files are present, the new reader opens data and delete files through Iceberg FileIO instead of the configured warehouseSource/inputFormat. Existing Iceberg specs use warehouseSource for S3/GCS/local access settings, endpoints, and credentials, so a table that only adds delete files can silently switch file-access mechanisms and fail or read from the wrong filesystem. The v2 path should preserve or translate the warehouseSource-backed access configuration for native reads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
