FrankChen021 commented on code in PR #19266:
URL: https://github.com/apache/druid/pull/19266#discussion_r3324570062


##########
extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergFileTaskInputSource.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceFactory;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+/**
+ * A {@link SplittableInputSource} representing a single Iceberg v2 data file 
with its
+ * associated delete files. Created internally by {@link IcebergInputSource} 
when v2
+ * delete files are detected. Reports itself as non-splittable (one file = one 
split)
+ * so MSQ routes it to a single worker without further splitting.
+ *
+ * This input source is JSON-serializable, carrying all metadata needed for a 
worker
+ * to read a data file and apply deletes without catalog access:
+ * <ul>
+ *   <li>Data file path</li>
+ *   <li>Delete file metadata (paths, types, equality field IDs)</li>
+ *   <li>Serialized Iceberg table schema (JSON)</li>
+ *   <li>warehouseSource for file I/O</li>
+ * </ul>
+ */
+public class IcebergFileTaskInputSource implements 
SplittableInputSource<List<String>>
+{
+  public static final String TYPE_KEY = "icebergFileTask";
+
+  private final String dataFilePath;
+  private final List<DeleteFileInfo> deleteFiles;
+  private final String tableSchemaJson;
+  private final InputSourceFactory warehouseSource;
+  private final String fileIOImpl;
+  private final Map<String, String> fileIOProperties;
+  // TODO https://github.com/apache/druid/issues/19472: extend to ORC/AVRO 
once iceberg-orc/iceberg-avro deps are added
+  private final String fileFormat;
+  private final Configuration hadoopConf;
+
+  @JsonCreator
+  public IcebergFileTaskInputSource(
+      @JsonProperty("dataFilePath") final String dataFilePath,
+      @JsonProperty("deleteFiles") final List<DeleteFileInfo> deleteFiles,
+      @JsonProperty("tableSchemaJson") final String tableSchemaJson,
+      @JsonProperty("warehouseSource") final InputSourceFactory 
warehouseSource,
+      @JsonProperty("fileIOImpl") @Nullable final String fileIOImpl,
+      @JsonProperty("fileIOProperties") @Nullable final Map<String, String> 
fileIOProperties,
+      @JsonProperty("fileFormat") @Nullable final String fileFormat,
+      final Configuration hadoopConf

Review Comment:
   [P1] Unbound Configuration breaks v2 split deserialization
   
   IcebergFileTaskInputSource is registered as a Jackson subtype and returned 
from withSplit for v2 tables, so workers need to deserialize it. The 
@JsonCreator leaves hadoopConf as an unannotated constructor parameter, unlike 
the catalog classes that use @JacksonInject @HiveConf, so Druid's ObjectMapper 
has no property or injectable value to bind. This can make v2 delete splits 
fail before reading. Inject it, annotate it, or create a safe default, and add 
a Jackson round-trip test for this input source.



##########
extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergNativeRecordReader.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceFactory;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputStats;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Types;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * An {@link InputSourceReader} that reads an Iceberg data file and applies
+ * associated positional and equality delete files before converting records
+ * to Druid {@link InputRow} objects.
+ *
+ * Delete application follows the Iceberg v2 spec:
+ * <ol>
+ *   <li>Positional deletes: read (file_path, pos) pairs, filter by current 
data file,
+ *       build a Set of deleted positions</li>
+ *   <li>Equality deletes: read key tuples from equality delete files, build 
Sets
+ *       of deleted key values per equality field set</li>
+ *   <li>Stream data file: for each record, skip if position-deleted or 
equality-deleted</li>
+ * </ol>
+ *
+ * Only Parquet file format is supported. ORC and Avro require additional 
dependencies;
+ * see TODO https://github.com/apache/druid/issues/19472.
+ */
+public class IcebergNativeRecordReader implements InputSourceReader
+{
+  private static final Logger log = new 
Logger(IcebergNativeRecordReader.class);
+
+  private final String dataFilePath;
+  private final List<DeleteFileInfo> deleteFiles;
+  private final String tableSchemaJson;
+  private final InputSourceFactory warehouseSource;
+  private final InputRowSchema inputRowSchema;
+  private final Configuration hadoopConf;
+  private final FileIO fileIO;
+  private final String fileFormat;
+
+  public IcebergNativeRecordReader(
+      final String dataFilePath,
+      final List<DeleteFileInfo> deleteFiles,
+      final String tableSchemaJson,
+      final InputSourceFactory warehouseSource,
+      final InputRowSchema inputRowSchema,
+      @Nullable final String fileIOImpl,
+      @Nullable final Map<String, String> fileIOProperties,
+      final String fileFormat,
+      final Configuration hadoopConf
+  )
+  {
+    this.dataFilePath = dataFilePath;
+    this.deleteFiles = deleteFiles;
+    this.tableSchemaJson = tableSchemaJson;
+    this.warehouseSource = warehouseSource;
+    this.inputRowSchema = inputRowSchema;
+    this.hadoopConf = hadoopConf;
+    this.fileIO = buildFileIO(fileIOImpl, fileIOProperties, hadoopConf);
+    this.fileFormat = fileFormat;
+  }
+
+  private void requireParquet(final String filePath)
+  {
+    if (!"PARQUET".equalsIgnoreCase(fileFormat)) {
+      throw new UnsupportedOperationException(
+          "Iceberg file format [" + fileFormat + "] is not supported for file 
[" + filePath
+          + "]. Only PARQUET is currently supported. See 
https://github.com/apache/druid/issues/19472";
+      );
+    }
+  }
+
+  private void requireParquetDeleteFile(final DeleteFileInfo deleteFileInfo)
+  {
+    if (!"PARQUET".equalsIgnoreCase(deleteFileInfo.getFormat())) {
+      throw new UnsupportedOperationException(
+          "Iceberg delete file format [" + deleteFileInfo.getFormat() + "] is 
not supported for file ["
+          + deleteFileInfo.getPath() + "]. Only PARQUET is currently 
supported. "
+          + "See https://github.com/apache/druid/issues/19472";
+      );
+    }
+  }
+
+  private static FileIO buildFileIO(
+      @Nullable final String fileIOImpl,
+      @Nullable final Map<String, String> fileIOProperties,
+      final Configuration hadoopConf
+  )
+  {
+    final Map<String, String> props = fileIOProperties == null ? 
Collections.emptyMap() : fileIOProperties;
+    if (fileIOImpl == null || fileIOImpl.isEmpty()) {
+      return new HadoopFileIO(hadoopConf);
+    }
+    return CatalogUtil.loadFileIO(fileIOImpl, props, hadoopConf);
+  }
+
+  @Override
+  public CloseableIterator<InputRow> read(final InputStats inputStats) throws 
IOException
+  {
+    final Schema tableSchema = SchemaParser.fromJson(tableSchemaJson);
+
+    // Step 1: Collect positional deletes
+    final Set<Long> deletedPositions = collectPositionalDeletes();
+
+    // Step 2: Collect equality deletes
+    final List<EqualityDeleteSet> equalityDeleteSets = 
collectEqualityDeletes(tableSchema);
+
+    // Step 3: Stream data file with delete application
+    requireParquet(dataFilePath);
+    final InputFile dataInputFile = fileIO.newInputFile(dataFilePath);

Review Comment:
   [P1] V2 path bypasses warehouseSource file access
   
   When delete files are present, the new reader opens data and delete files 
through Iceberg FileIO instead of the configured warehouseSource/inputFormat. 
Existing Iceberg specs use warehouseSource for S3/GCS/local access settings, 
endpoints, and credentials, so a table that only adds delete files can silently 
switch file-access mechanisms and fail or read from the wrong filesystem. The 
v2 path should preserve or translate the warehouseSource-backed access 
configuration for native reads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to