Copilot commented on code in PR #596:
URL: https://github.com/apache/hudi-rs/pull/596#discussion_r3203627865


##########
crates/core/Cargo.toml:
##########
@@ -81,6 +81,12 @@ prost = { workspace = true }
 # compression
 flate2 = { workspace = true }
 
+# lance
+lance-core = { version = "4.0.1" }
+lance-encoding = { version = "4.0.1" }
+lance-file = { version = "4.0.1" }
+lance-io = { version = "4.0.1" }

Review Comment:
   The PR description says Lance support is behind an optional `lance` feature 
flag, but these `lance-*` dependencies are currently unconditional (and there 
is no `lance` feature in `[features]`). This forces all `hudi-core` consumers 
to compile Lance (and pull its transitive deps) even when they only want 
Parquet.
   
   Consider marking `lance-*` deps as `optional = true`, adding a `lance` 
feature, and gating the Lance reader modules / error variants with 
`#[cfg(feature = "lance")]` (or update the PR description/docs if Lance is 
intended to be mandatory).



##########
crates/core/src/storage/error.rs:
##########
@@ -49,6 +49,9 @@ pub enum StorageError {
     #[error(transparent)]
     ParquetError(#[from] parquet::errors::ParquetError),
 

Review Comment:
   `StorageError::LanceError` introduces an unconditional dependency on 
`lance_core::Error`. If Lance is meant to remain optional, this variant (and 
the associated `From` impl) should be behind `#[cfg(feature = "lance")]` (or 
use a boxed error type / string message when the feature is disabled) so 
non-Lance builds don't have to pull in Lance crates.
   



##########
crates/core/src/config/table.rs:
##########
@@ -393,18 +395,17 @@ impl BaseFileFormatValue {
             Some(Self::Parquet)
         } else if Self::ends_with_ignore_ascii_case(path, ".hfile") {
             Some(Self::HFile)
+        } else if Self::ends_with_ignore_ascii_case(path, ".lance") {
+            Some(Self::Lance)
         } else {
             None
         }
     }
 
     /// Returns true when `path` has this format's base-file suffix.
     pub fn matches_extension(&self, path: &str) -> bool {
-        let suffix = match self {
-            Self::Parquet => ".parquet",
-            Self::HFile => ".hfile",
-        };
-        Self::ends_with_ignore_ascii_case(path, suffix)
+        let suffix = format!(".{}", self.as_ref());
+        Self::ends_with_ignore_ascii_case(path, &suffix)

Review Comment:
   `BaseFileFormatValue::matches_extension` now allocates a new `String` on 
every call via `format!(...)`. This is on the hot path for file listing / 
filtering and can create avoidable per-file heap churn.
   
   Prefer a non-allocating implementation (e.g., match on `self` to static 
suffixes like ".parquet" / ".hfile" / ".lance") while keeping the 
case-insensitive comparison logic.
   



##########
crates/core/src/schema/resolver.rs:
##########
@@ -20,6 +20,7 @@ use crate::avro_to_arrow::to_arrow_schema;
 use crate::config::table::BaseFileFormatValue;
 use crate::config::table::HudiTableConfig;
 use crate::error::{CoreError, Result};
+use crate::file_group::base_file::lance::LanceBaseFileReader;
 use crate::file_group::base_file::parquet::ParquetBaseFileReader;
 use crate::metadata::commit::HoodieCommitMetadata;

Review Comment:
   This file now directly imports and uses `LanceBaseFileReader`. If Lance 
support is intended to be behind an optional `lance` feature, this import/match 
arm needs `#[cfg(feature = "lance")]` (and the `Lance` match arm should either 
be behind the same cfg or return a clear `CoreError` when the feature is 
disabled). Otherwise, `hudi-core` will require Lance crates in all builds.



##########
crates/core/src/file_group/base_file/mod.rs:
##########
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+pub mod lance;
 pub mod parquet;
 pub mod reader;

Review Comment:
   `pub mod lance;` is unconditional, so `hudi-core` will always compile the 
Lance reader module (and thus require the `lance-*` dependencies). If Lance 
support is intended to be feature-gated, gate this module export with 
`#[cfg(feature = "lance")]` and provide a clear error path when the feature is 
disabled but a `.lance` file is encountered.



##########
crates/datafusion/src/lib.rs:
##########
@@ -46,13 +47,19 @@ use datafusion_expr::{CreateExternalTable, Expr, 
TableProviderFilterPushDown, Ta
 use datafusion_physical_expr::create_physical_expr;
 use log::warn;
 
+use crate::hudi_exec::HudiScanExec;
 use crate::util::expr::exprs_to_filters;
-use hudi_core::config::read::HudiReadConfig::{InputPartitions, 
UseReadOptimizedMode};
+use hudi_core::config::read::HudiReadConfig::{
+    FileSliceReadConcurrency, InputPartitions, UseReadOptimizedMode,
+};
 use hudi_core::config::table::{BaseFileFormatValue, HudiTableConfig};
 use hudi_core::config::util::empty_options;
+use hudi_core::file_group::file_slice::FileSlice;
 use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
 use hudi_core::table::{ReadOptions, Table as HudiTable};
 
+const DEFAULT_FILE_SLICE_READ_CONCURRENCY: usize = 4;

Review Comment:
   `DEFAULT_FILE_SLICE_READ_CONCURRENCY` duplicates the default defined in 
`hudi-core`'s `HudiReadConfig::FileSliceReadConcurrency` (currently also 4). 
This risks drift if the core default changes.
   
   Consider deriving the default via 
`FileSliceReadConcurrency.parse_value_or_default(...)` (or re-exporting a 
single shared default) so the DataFusion provider stays consistent with core 
config semantics.
   



##########
crates/datafusion/src/hudi_exec.rs:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Custom DataFusion execution plan for reading Hudi tables through
+//! [`FileGroupReader`], supporting all base file formats and MOR log merging.
+
+use std::any::Any;
+use std::fmt;
+use std::sync::Arc;
+
+use arrow_schema::SchemaRef;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+    SendableRecordBatchStream,
+};
+use datafusion_common::DataFusionError::Execution;
+use datafusion_common::stats::Precision;
+use datafusion_common::{ColumnStatistics, Result, Statistics};
+use futures::stream::TryStreamExt;
+use futures::{StreamExt, stream};
+
+use hudi_core::file_group::file_slice::FileSlice;
+use hudi_core::file_group::reader::FileGroupReader;
+use hudi_core::table::ReadOptions;
+
+/// DataFusion execution plan that reads Hudi file slices through
+/// [`FileGroupReader`].
+///
+/// Used for non-Parquet base file formats (Lance) and MOR snapshot
+/// queries where base + log file merging is required. Parquet-only
+/// COW and MOR read-optimized queries continue to use DataFusion's
+/// native `ParquetSource` path for row-group/page-level pruning.
+#[derive(Debug)]
+pub struct HudiScanExec {
+    file_slice_partitions: Vec<Vec<FileSlice>>,
+    file_group_reader: Arc<FileGroupReader>,
+    read_options: ReadOptions,
+    file_slice_read_concurrency: usize,
+    projected_schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    properties: PlanProperties,
+}
+
+impl HudiScanExec {
+    pub fn new(
+        file_slice_partitions: Vec<Vec<FileSlice>>,
+        file_group_reader: Arc<FileGroupReader>,
+        read_options: ReadOptions,
+        file_slice_read_concurrency: usize,
+        schema: SchemaRef,
+        projection: Option<Vec<usize>>,
+    ) -> Self {
+        let projected_schema = if let Some(ref proj) = projection {
+            let fields: Vec<_> = proj.iter().map(|&i| 
schema.field(i).clone()).collect();
+            Arc::new(arrow_schema::Schema::new(fields))
+        } else {
+            schema.clone()
+        };
+
+        let partitions = if file_slice_partitions.is_empty() {
+            vec![vec![]]
+        } else {
+            file_slice_partitions
+        };
+        let n_partitions = partitions.len();
+
+        let properties = PlanProperties::new(
+            
datafusion::physical_expr::EquivalenceProperties::new(projected_schema.clone()),
+            Partitioning::UnknownPartitioning(n_partitions),
+            EmissionType::Incremental,
+            Boundedness::Bounded,
+        );
+
+        Self {
+            file_slice_partitions: partitions,
+            file_group_reader,
+            read_options,
+            file_slice_read_concurrency: file_slice_read_concurrency.max(1),
+            projected_schema,
+            projection,
+            properties,
+        }
+    }
+}
+
+impl DisplayAs for HudiScanExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
+        let total_slices: usize = self.file_slice_partitions.iter().map(|p| 
p.len()).sum();
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "HudiScanExec: partitions={}, file_slices={}, 
file_slice_read_concurrency={}, projection={:?}",
+                    self.file_slice_partitions.len(),
+                    total_slices,
+                    self.file_slice_read_concurrency,
+                    self.projection,
+                )
+            }
+            _ => {
+                write!(f, "HudiScanExec")
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for HudiScanExec {
+    fn name(&self) -> &str {
+        "HudiScanExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.properties
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            Err(Execution(
+                "HudiScanExec is a leaf node and does not accept 
children".to_string(),
+            ))
+        }
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        _context: Arc<datafusion::execution::TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let file_slices = self
+            .file_slice_partitions
+            .get(partition)
+            .ok_or_else(|| {
+                Execution(format!(
+                    "HudiScanExec partition {partition} out of range (have 
{})",
+                    self.file_slice_partitions.len()
+                ))
+            })?
+            .clone();
+
+        let projected_schema = self.projected_schema.clone();
+
+        if file_slices.is_empty() {
+            return Ok(Box::pin(RecordBatchStreamAdapter::new(
+                projected_schema,
+                futures::stream::empty(),
+            )));
+        }
+
+        let reader = self.file_group_reader.clone();
+        let options = self.read_options.clone();
+        let concurrency = self
+            .file_slice_read_concurrency
+            .min(file_slices.len())
+            .max(1);
+
+        let stream = stream::iter(file_slices)
+            .map(move |file_slice| {
+                let reader = reader.clone();
+                let options = options.clone();
+                async move {
+                    let inner_stream = reader
+                        .read_file_slice_stream(&file_slice, &options)
+                        .await
+                        .map_err(|e| {
+                        datafusion_common::DataFusionError::Execution(format!(
+                            "Failed to read file slice: {e}"
+                        ))
+                    })?;
+                    Ok::<_, 
datafusion_common::DataFusionError>(inner_stream.map_err(|e| {
+                        datafusion_common::DataFusionError::Execution(format!(
+                            "Failed to read batch: {e}"
+                        ))
+                    }))
+                }
+            })
+            .buffered(concurrency)

Review Comment:
   `stream::iter(...).buffered(concurrency)` preserves input ordering, which 
can cause head-of-line blocking (a slow first file slice prevents later slices 
from starting to stream) even though the scan output is explicitly unordered.
   
   To maximize parallelism, consider using `buffer_unordered(concurrency)` here 
(and keep `try_flatten_unordered` / unordered flattening), since ordering is 
not required.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to