andygrove commented on code in PR #2528:
URL: https://github.com/apache/datafusion-comet/pull/2528#discussion_r2534516919


##########
native/core/src/execution/operators/iceberg_scan.rs:
##########
@@ -0,0 +1,477 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Native Iceberg table scan operator using iceberg-rust
+
+use std::any::Any;
+use std::collections::{HashMap, VecDeque};
+use std::fmt;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::SchemaRef;
+use datafusion::common::{DataFusionError, Result as DFResult};
+use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, 
TaskContext};
+use datafusion::physical_expr::EquivalenceProperties;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::metrics::{
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
+};
+use datafusion::physical_plan::{
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+};
+use futures::future::BoxFuture;
+use futures::{ready, FutureExt, Stream, StreamExt, TryStreamExt};
+use iceberg::io::FileIO;
+
+use crate::execution::operators::ExecutionError;
+use crate::parquet::parquet_support::SparkParquetOptions;
+use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
+use datafusion::datasource::schema_adapter::SchemaAdapterFactory;
+use datafusion_comet_spark_expr::EvalMode;
+use datafusion_datasource::file_stream::FileStreamMetrics;
+
+/// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables.
+///
+/// Executes pre-planned FileScanTasks for efficient parallel scanning.
+#[derive(Debug)]
+pub struct IcebergScanExec {
+    /// Iceberg table metadata location for FileIO initialization
+    metadata_location: String,
+    /// Output schema after projection
+    output_schema: SchemaRef,
+    /// Cached execution plan properties
+    plan_properties: PlanProperties,
+    /// Catalog-specific configuration for FileIO
+    catalog_properties: HashMap<String, String>,
+    /// Pre-planned file scan tasks, grouped by partition
+    file_task_groups: Vec<Vec<iceberg::scan::FileScanTask>>,
+    /// Metrics
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl IcebergScanExec {
+    pub fn new(
+        metadata_location: String,
+        schema: SchemaRef,
+        catalog_properties: HashMap<String, String>,
+        file_task_groups: Vec<Vec<iceberg::scan::FileScanTask>>,
+    ) -> Result<Self, ExecutionError> {
+        let output_schema = schema;
+        let num_partitions = file_task_groups.len();
+        let plan_properties = 
Self::compute_properties(Arc::clone(&output_schema), num_partitions);
+
+        let metrics = ExecutionPlanMetricsSet::new();
+
+        Ok(Self {
+            metadata_location,
+            output_schema,
+            plan_properties,
+            catalog_properties,
+            file_task_groups,
+            metrics,
+        })
+    }
+
+    fn compute_properties(schema: SchemaRef, num_partitions: usize) -> 
PlanProperties {
+        PlanProperties::new(
+            EquivalenceProperties::new(schema),
+            Partitioning::UnknownPartitioning(num_partitions),
+            EmissionType::Incremental,
+            Boundedness::Bounded,
+        )
+    }
+}
+
+impl ExecutionPlan for IcebergScanExec {
+    fn name(&self) -> &str {
+        "IcebergScanExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.output_schema)
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.plan_properties
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> DFResult<SendableRecordBatchStream> {
+        if partition < self.file_task_groups.len() {
+            let tasks = &self.file_task_groups[partition];
+            self.execute_with_tasks(tasks.clone(), partition, context)
+        } else {
+            Err(DataFusionError::Execution(format!(
+                "IcebergScanExec: Partition index {} out of range (only {} 
task groups available)",
+                partition,
+                self.file_task_groups.len()
+            )))
+        }
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+}
+
+impl IcebergScanExec {
+    /// Handles MOR (Merge-On-Read) tables by automatically applying 
positional and equality
+    /// deletes via iceberg-rust's ArrowReader.
+    fn execute_with_tasks(
+        &self,
+        tasks: Vec<iceberg::scan::FileScanTask>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> DFResult<SendableRecordBatchStream> {
+        let output_schema = Arc::clone(&self.output_schema);
+        let file_io = Self::load_file_io(&self.catalog_properties, 
&self.metadata_location)?;
+        let batch_size = context.session_config().batch_size();
+
+        let metrics = IcebergScanMetrics::new(&self.metrics, partition);
+
+        // Create parallel file stream that overlaps opening next file with 
reading current file
+        let file_stream = IcebergFileStream::new(
+            tasks,
+            file_io,
+            batch_size,
+            Arc::clone(&output_schema),
+            metrics,
+        )?;
+
+        // Note: BatchSplitStream adds overhead. Since we're already setting 
batch_size in
+        // iceberg-rust's ArrowReaderBuilder, it should produce correctly 
sized batches.
+        // Only use BatchSplitStream as a safety net if needed.
+        // For now, return the file_stream directly to reduce stream nesting 
overhead.
+
+        Ok(Box::pin(file_stream))
+    }
+
+    fn load_file_io(
+        catalog_properties: &HashMap<String, String>,
+        metadata_location: &str,
+    ) -> Result<FileIO, DataFusionError> {
+        let mut file_io_builder = FileIO::from_path(metadata_location)
+            .map_err(|e| DataFusionError::Execution(format!("Failed to create 
FileIO: {}", e)))?;
+
+        for (key, value) in catalog_properties {
+            file_io_builder = file_io_builder.with_prop(key, value);
+        }
+
+        file_io_builder
+            .build()
+            .map_err(|e| DataFusionError::Execution(format!("Failed to build 
FileIO: {}", e)))
+    }
+}
+
+/// Metrics for IcebergScanExec
+struct IcebergScanMetrics {
+    /// Baseline metrics (output rows, elapsed compute time)
+    baseline: BaselineMetrics,
+    /// File stream metrics (time opening, time scanning, etc.)
+    file_stream: FileStreamMetrics,
+    /// Count of file splits (FileScanTasks) processed
+    num_splits: Count,
+}
+
+impl IcebergScanMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            file_stream: FileStreamMetrics::new(metrics, partition),
+            num_splits: MetricBuilder::new(metrics).counter("num_splits", 
partition),
+        }
+    }
+}
+
+/// State machine for IcebergFileStream
+enum FileStreamState {
+    /// Idle state - need to start opening next file
+    Idle,
+    /// Opening a file
+    Opening {
+        future: BoxFuture<'static, DFResult<SendableRecordBatchStream>>,
+    },
+    /// Reading from current file while potentially opening next file
+    Reading {
+        current: SendableRecordBatchStream,
+        next: Option<BoxFuture<'static, DFResult<SendableRecordBatchStream>>>,
+    },
+    /// Error state
+    Error,
+}
+
+/// Stream that reads Iceberg files with parallel opening optimization.
+/// Opens the next file while reading the current file to overlap IO with 
compute.
+///
+/// Inspired by DataFusion's [`FileStream`] pattern for overlapping file 
opening with reading.
+///
+/// [`FileStream`]: 
https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs
+struct IcebergFileStream {
+    schema: SchemaRef,
+    file_io: FileIO,
+    batch_size: usize,
+    tasks: VecDeque<iceberg::scan::FileScanTask>,
+    state: FileStreamState,
+    metrics: IcebergScanMetrics,
+}
+
+impl IcebergFileStream {
+    fn new(
+        tasks: Vec<iceberg::scan::FileScanTask>,
+        file_io: FileIO,
+        batch_size: usize,
+        schema: SchemaRef,
+        metrics: IcebergScanMetrics,
+    ) -> DFResult<Self> {
+        Ok(Self {
+            schema,
+            file_io,
+            batch_size,
+            tasks: tasks.into_iter().collect(),
+            state: FileStreamState::Idle,
+            metrics,
+        })
+    }
+
+    fn start_next_file(
+        &mut self,
+    ) -> Option<BoxFuture<'static, DFResult<SendableRecordBatchStream>>> {
+        let task = self.tasks.pop_front()?;
+
+        self.metrics.num_splits.add(1);
+
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size;
+        let schema = Arc::clone(&self.schema);
+
+        Some(Box::pin(async move {
+            let task_stream = futures::stream::iter(vec![Ok(task)]).boxed();
+
+            let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io)
+                .with_batch_size(batch_size)
+                .with_row_selection_enabled(true)
+                .build();
+
+            let stream = reader.read(task_stream).map_err(|e| {
+                DataFusionError::Execution(format!("Failed to read Iceberg 
task: {}", e))
+            })?;
+
+            let target_schema = Arc::clone(&schema);
+
+            // Schema adaptation handles differences in Arrow field names and 
metadata
+            // between the file schema and expected output schema
+            let mapped_stream = stream
+                .map_err(|e| DataFusionError::Execution(format!("Iceberg scan 
error: {}", e)))
+                .and_then(move |batch| {
+                    let spark_options = 
SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);

Review Comment:
   Do you have tests (perhaps via Iceberg suite) that test for non-UTC timezone 
timestamps?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to