Copilot commented on code in PR #596:
URL: https://github.com/apache/hudi-rs/pull/596#discussion_r3191002649


##########
crates/core/src/storage/lance_reader.rs:
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Lance implementation of [`BaseFileReader`].
+//!
+//! Reads individual Lance data files (single-file datasets produced by
+//! Hudi's Lance integration) using the `lance-file` crate.
+
+use std::sync::Arc;
+
+use arrow::compute::concat_batches;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use futures::StreamExt;
+use lance_core::cache::LanceCache;
+use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
+use lance_file::reader::{FileReader, FileReaderOptions, ReaderProjection};
+use lance_io::ReadBatchParams;
+use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
+use lance_io::utils::CachedFileSize;
+use object_store::path::Path as ObjPath;
+
+use crate::statistics::StatisticsContainer;
+use crate::storage::Storage;
+use crate::storage::base_file_reader::{BaseFileReadOptions, BaseFileReader, 
BaseFileStream};
+use crate::storage::error::{Result, StorageError};
+use crate::storage::file_metadata::FileMetadata;
+use crate::storage::util::join_url_segments;
+
+const DEFAULT_BATCH_SIZE: u32 = 8192;
+const DEFAULT_BATCH_READAHEAD: u32 = 16;
+
+/// Lance implementation of [`BaseFileReader`].
+///
+/// Each Hudi base file with `.lance` extension is a standalone Lance data
+/// file containing a single fragment. This reader opens individual files
+/// using `lance-file`'s `FileReader`.
+pub struct LanceBaseFileReader {
+    storage: Arc<Storage>,
+}
+
+impl LanceBaseFileReader {
+    pub fn new(storage: Arc<Storage>) -> Self {
+        Self { storage }
+    }
+
+    async fn open_file_reader(
+        &self,
+        relative_path: &str,
+        projection: Option<&[String]>,
+    ) -> Result<FileReader> {
+        let obj_url = join_url_segments(&self.storage.base_url, 
&[relative_path])?;
+        let obj_path = ObjPath::from_url_path(obj_url.path())?;
+
+        let storage_accessor = Arc::new(
+            
lance_io::object_store::StorageOptionsAccessor::with_static_options(
+                (*self.storage.options).clone(),
+            ),
+        );
+        #[allow(deprecated)]
+        let params = lance_io::object_store::ObjectStoreParams {
+            object_store: Some((
+                self.storage.object_store.clone(),
+                (*self.storage.base_url).clone(),
+            )),
+            storage_options_accessor: Some(storage_accessor),
+            ..Default::default()
+        };
+
+        let lance_store = 
lance_io::object_store::ObjectStore::from_uri_and_params(
+            Arc::new(lance_io::object_store::ObjectStoreRegistry::default()),
+            obj_url.as_str(),
+            &params,
+        )
+        .await
+        .map_err(|e| {
+            StorageError::Creation(format!(
+                "Failed to create Lance object store for {relative_path}: {e}"
+            ))
+        })?;
+
+        let scheduler = ScanScheduler::new(lance_store.0, 
SchedulerConfig::default_for_testing());

Review Comment:
   `SchedulerConfig::default_for_testing()` is being used in production code 
paths. Testing defaults commonly trade correctness/coverage for reduced 
resources or different concurrency behavior; this can negatively impact 
performance and reliability in real workloads. Prefer a non-testing default 
(e.g., the normal default config) and/or plumb scheduler configuration via 
`Storage` options or `BaseFileReadOptions`.
   



##########
crates/core/src/storage/mod.rs:
##########
@@ -173,6 +176,51 @@ impl Storage {
         )
     }
 
+    /// Create a [`base_file_reader::BaseFileReader`] for the given format.
+    ///
+    /// Returns `ParquetBaseFileReader` for Parquet; Lance support requires the
+    /// `lance` feature (not yet implemented, returns an error).

Review Comment:
   The doc comment says Lance support is 'not yet implemented, returns an 
error', but `BaseFileFormatValue::Lance` is implemented under `cfg(feature = 
\"lance\")` and returns `LanceBaseFileReader`. Update the comment to reflect 
the current behavior (feature-gated implementation vs error when feature is 
disabled).
   



##########
crates/core/src/config/table.rs:
##########
@@ -358,6 +358,21 @@ pub enum BaseFileFormatValue {
     /// HFile format - only valid for metadata tables.
     #[strum(serialize = "hfile")]
     HFile,
+    #[strum(serialize = "lance")]
+    Lance,
+}
+
+impl BaseFileFormatValue {
+    /// Detect format from a file extension, returning `None` if unrecognized.
+    pub fn from_extension(path: &str) -> Option<Self> {
+        if path.ends_with(".parquet") {
+            Some(Self::Parquet)
+        } else if path.ends_with(".lance") {
+            Some(Self::Lance)
+        } else {
+            None
+        }

Review Comment:
   Extension detection is case-sensitive (`.PARQUET` / `.LANCE` won’t match). 
If this is used on paths that may come from user input or systems that emit 
uppercase extensions, it can cause incorrect format resolution and subsequent 
reader selection. Consider normalizing `path` to lowercase for the suffix check 
(or using a case-insensitive comparison).



##########
crates/datafusion/src/hudi_exec.rs:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Custom DataFusion execution plan for reading Hudi tables through
+//! [`FileGroupReader`], supporting all base file formats and MOR log merging.
+
+use std::any::Any;
+use std::fmt;
+use std::sync::Arc;
+
+use arrow_schema::SchemaRef;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+    SendableRecordBatchStream,
+};
+use datafusion_common::DataFusionError::Execution;
+use datafusion_common::Result;
+use futures::stream::TryStreamExt;
+use futures::{StreamExt, stream};
+
+use hudi_core::file_group::file_slice::FileSlice;
+use hudi_core::file_group::reader::FileGroupReader;
+use hudi_core::table::ReadOptions;
+
+/// DataFusion execution plan that reads Hudi file slices through
+/// [`FileGroupReader`].
+///
+/// Used for non-Parquet base file formats (Lance) and MOR snapshot
+/// queries where base + log file merging is required. Parquet-only
+/// COW and MOR read-optimized queries continue to use DataFusion's
+/// native `ParquetSource` path for row-group/page-level pruning.
+#[derive(Debug)]
+pub struct HudiScanExec {
+    file_slice_partitions: Vec<Vec<FileSlice>>,
+    file_group_reader: Arc<FileGroupReader>,
+    read_options: ReadOptions,
+    #[allow(dead_code)]
+    schema: SchemaRef,
+    projected_schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    limit: Option<usize>,
+    properties: PlanProperties,
+}
+
+impl HudiScanExec {
+    pub fn new(
+        file_slice_partitions: Vec<Vec<FileSlice>>,
+        file_group_reader: Arc<FileGroupReader>,
+        read_options: ReadOptions,
+        schema: SchemaRef,
+        projection: Option<Vec<usize>>,
+        limit: Option<usize>,
+    ) -> Self {
+        let projected_schema = if let Some(ref proj) = projection {
+            let fields: Vec<_> = proj.iter().map(|&i| 
schema.field(i).clone()).collect();
+            Arc::new(arrow_schema::Schema::new(fields))
+        } else {
+            schema.clone()
+        };
+
+        let partitions = if file_slice_partitions.is_empty() {
+            vec![vec![]]
+        } else {
+            file_slice_partitions
+        };
+        let n_partitions = partitions.len();
+
+        let properties = PlanProperties::new(
+            
datafusion::physical_expr::EquivalenceProperties::new(projected_schema.clone()),
+            Partitioning::UnknownPartitioning(n_partitions),
+            EmissionType::Incremental,
+            Boundedness::Bounded,
+        );
+
+        Self {
+            file_slice_partitions: partitions,
+            file_group_reader,
+            read_options,
+            schema,
+            projected_schema,
+            projection,
+            limit,
+            properties,
+        }
+    }
+}
+
+impl DisplayAs for HudiScanExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
+        let total_slices: usize = self.file_slice_partitions.iter().map(|p| 
p.len()).sum();
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "HudiScanExec: partitions={}, file_slices={}, 
projection={:?}, limit={:?}",
+                    self.file_slice_partitions.len(),
+                    total_slices,
+                    self.projection,
+                    self.limit,
+                )
+            }
+            _ => {
+                write!(f, "HudiScanExec")
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for HudiScanExec {
+    fn name(&self) -> &str {
+        "HudiScanExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.properties
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            Err(Execution(
+                "HudiScanExec is a leaf node and does not accept 
children".to_string(),
+            ))
+        }
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        _context: Arc<datafusion::execution::TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let file_slices = self
+            .file_slice_partitions
+            .get(partition)
+            .ok_or_else(|| {
+                Execution(format!(
+                    "HudiScanExec partition {partition} out of range (have 
{})",
+                    self.file_slice_partitions.len()
+                ))
+            })?
+            .clone();
+
+        let projected_schema = self.projected_schema.clone();
+
+        if file_slices.is_empty() {
+            return Ok(Box::pin(RecordBatchStreamAdapter::new(
+                projected_schema,
+                futures::stream::empty(),
+            )));
+        }
+
+        let reader = self.file_group_reader.clone();
+        let options = self.read_options.clone();
+
+        let stream = stream::iter(file_slices)
+            .then(move |file_slice| {
+                let reader = reader.clone();
+                let options = options.clone();
+                async move {
+                    let inner_stream = reader
+                        .read_file_slice_stream(&file_slice, &options)
+                        .await
+                        .map_err(|e| {
+                        datafusion_common::DataFusionError::Execution(format!(
+                            "Failed to read file slice: {e}"
+                        ))
+                    })?;
+                    Ok::<_, 
datafusion_common::DataFusionError>(inner_stream.map_err(|e| {
+                        datafusion_common::DataFusionError::Execution(format!(
+                            "Failed to read batch: {e}"
+                        ))
+                    }))
+                }
+            })
+            .try_flatten();

Review Comment:
   Within a partition, file slices are processed sequentially because 
`.then(...)` awaits each slice before starting the next. This can significantly 
reduce throughput when a partition has many slices and IO latency is 
non-trivial. Consider adding bounded concurrency (e.g., buffering a limited 
number of in-flight slice reads) and making it configurable; this retains 
backpressure while improving scan parallelism.



##########
crates/datafusion/src/lib.rs:
##########
@@ -146,15 +150,14 @@ impl HudiDataSource {
             .await
             .map_err(|e| Execution(format!("Failed to create Hudi table: 
{e}")))?;
 
-        let base_file_format: String = table
-            .hudi_configs
-            .get_or_default(HudiTableConfig::BaseFileFormat)
-            .into();
-        if 
!base_file_format.eq_ignore_ascii_case(BaseFileFormatValue::Parquet.as_ref()) {
-            return Err(Execution(format!(
-                "Unsupported base file format '{base_file_format}' for 
HudiDataSource; only parquet is supported"
-            )));
-        }
+        let base_file_format: BaseFileFormatValue = {
+            let s: String = table
+                .hudi_configs
+                .get_or_default(HudiTableConfig::BaseFileFormat)
+                .into();
+            s.parse::<BaseFileFormatValue>()
+                .unwrap_or(BaseFileFormatValue::Parquet)
+        };

Review Comment:
   This silently falls back to Parquet when the table explicitly sets an 
invalid/unknown `hoodie.table.base.file.format`. That can route planning down 
the ParquetSource fast path and produce confusing runtime failures (or 
incorrect assumptions) for non-Parquet tables. Consider only defaulting to 
Parquet when the config is absent; if the config key is present but 
unparsable/unsupported, return an `Execution` error mentioning the bad value 
and expected values.
   



##########
crates/datafusion/src/lib.rs:
##########
@@ -204,13 +207,32 @@ impl HudiDataSource {
             partition_schema,
             cached_stats,
             input_partitions,
+            base_file_format,
         })
     }
 
     fn get_input_partitions(&self) -> usize {
         self.input_partitions
     }
 
+    /// Parquet COW and Parquet MOR read-optimized use ParquetSource for
+    /// native row-group/page pruning. Parquet MOR snapshot, Lance, and
+    /// all other cases use HudiScanExec.
+    fn use_parquet_source(&self) -> bool {
+        if !matches!(self.base_file_format, BaseFileFormatValue::Parquet) {

Review Comment:
   `Storage::create_base_file_reader` treats `BaseFileFormatValue::HFile` as a 
Parquet-backed reader, but `use_parquet_source()` disables the ParquetSource 
fast path for any non-Parquet enum value. If `HFile` is expected to be readable 
via Parquet in DataFusion (as the updated test suggests), consider treating 
`HFile` the same as Parquet here (or otherwise clarifying via config/intent), 
to avoid unexpectedly losing ParquetSource pruning and scan performance.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to