Copilot commented on code in PR #596:
URL: https://github.com/apache/hudi-rs/pull/596#discussion_r3203627865
##########
crates/core/Cargo.toml:
##########
@@ -81,6 +81,12 @@ prost = { workspace = true }
# compression
flate2 = { workspace = true }
+# lance
+lance-core = { version = "4.0.1" }
+lance-encoding = { version = "4.0.1" }
+lance-file = { version = "4.0.1" }
+lance-io = { version = "4.0.1" }
Review Comment:
The PR description says Lance support is behind an optional `lance` feature
flag, but these `lance-*` dependencies are currently unconditional (and there
is no `lance` feature in `[features]`). This forces all `hudi-core` consumers
to compile Lance (and pull its transitive deps) even when they only want
Parquet.
Consider marking `lance-*` deps as `optional = true`, adding a `lance`
feature, and gating the Lance reader modules / error variants with
`#[cfg(feature = "lance")]` (or update the PR description/docs if Lance is
intended to be mandatory).
##########
crates/core/src/storage/error.rs:
##########
@@ -49,6 +49,9 @@ pub enum StorageError {
#[error(transparent)]
ParquetError(#[from] parquet::errors::ParquetError),
Review Comment:
`StorageError::LanceError` introduces an unconditional dependency on
`lance_core::Error`. If Lance is meant to remain optional, this variant (and
the associated `From` impl) should be behind `#[cfg(feature = "lance")]` (or
use a boxed error type / string message when the feature is disabled) so
non-Lance builds don't have to pull in Lance crates.
##########
crates/core/src/config/table.rs:
##########
@@ -393,18 +395,17 @@ impl BaseFileFormatValue {
Some(Self::Parquet)
} else if Self::ends_with_ignore_ascii_case(path, ".hfile") {
Some(Self::HFile)
+ } else if Self::ends_with_ignore_ascii_case(path, ".lance") {
+ Some(Self::Lance)
} else {
None
}
}
/// Returns true when `path` has this format's base-file suffix.
pub fn matches_extension(&self, path: &str) -> bool {
- let suffix = match self {
- Self::Parquet => ".parquet",
- Self::HFile => ".hfile",
- };
- Self::ends_with_ignore_ascii_case(path, suffix)
+ let suffix = format!(".{}", self.as_ref());
+ Self::ends_with_ignore_ascii_case(path, &suffix)
Review Comment:
`BaseFileFormatValue::matches_extension` now allocates a new `String` on
every call via `format!(...)`. This is on the hot path for file listing /
filtering and can create avoidable per-file heap churn.
Prefer a non-allocating implementation (e.g., match on `self` to static
suffixes like ".parquet" / ".hfile" / ".lance") while keeping the
case-insensitive comparison logic.
##########
crates/core/src/schema/resolver.rs:
##########
@@ -20,6 +20,7 @@ use crate::avro_to_arrow::to_arrow_schema;
use crate::config::table::BaseFileFormatValue;
use crate::config::table::HudiTableConfig;
use crate::error::{CoreError, Result};
+use crate::file_group::base_file::lance::LanceBaseFileReader;
use crate::file_group::base_file::parquet::ParquetBaseFileReader;
use crate::metadata::commit::HoodieCommitMetadata;
Review Comment:
This file now directly imports and uses `LanceBaseFileReader`. If Lance
support is intended to be behind an optional `lance` feature, this import/match
arm needs `#[cfg(feature = "lance")]` (and the `Lance` match arm should either
be behind the same cfg or return a clear `CoreError` when the feature is
disabled). Otherwise, `hudi-core` will require Lance crates in all builds.
##########
crates/core/src/file_group/base_file/mod.rs:
##########
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+pub mod lance;
pub mod parquet;
pub mod reader;
Review Comment:
`pub mod lance;` is unconditional, so `hudi-core` will always compile the
Lance reader module (and thus require the `lance-*` dependencies). If Lance
support is intended to be feature-gated, gate this module export with
`#[cfg(feature = "lance")]` and provide a clear error path when the feature is
disabled but a `.lance` file is encountered.
##########
crates/datafusion/src/lib.rs:
##########
@@ -46,13 +47,19 @@ use datafusion_expr::{CreateExternalTable, Expr,
TableProviderFilterPushDown, Ta
use datafusion_physical_expr::create_physical_expr;
use log::warn;
+use crate::hudi_exec::HudiScanExec;
use crate::util::expr::exprs_to_filters;
-use hudi_core::config::read::HudiReadConfig::{InputPartitions,
UseReadOptimizedMode};
+use hudi_core::config::read::HudiReadConfig::{
+ FileSliceReadConcurrency, InputPartitions, UseReadOptimizedMode,
+};
use hudi_core::config::table::{BaseFileFormatValue, HudiTableConfig};
use hudi_core::config::util::empty_options;
+use hudi_core::file_group::file_slice::FileSlice;
use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
use hudi_core::table::{ReadOptions, Table as HudiTable};
+const DEFAULT_FILE_SLICE_READ_CONCURRENCY: usize = 4;
Review Comment:
`DEFAULT_FILE_SLICE_READ_CONCURRENCY` duplicates the default defined in
`hudi-core`'s `HudiReadConfig::FileSliceReadConcurrency` (currently also 4).
This risks drift if the core default changes.
Consider deriving the default via
`FileSliceReadConcurrency.parse_value_or_default(...)` (or re-exporting a
single shared default) so the DataFusion provider stays consistent with core
config semantics.
##########
crates/datafusion/src/hudi_exec.rs:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//! Custom DataFusion execution plan for reading Hudi tables through
+//! [`FileGroupReader`], supporting all base file formats and MOR log merging.
+
+use std::any::Any;
+use std::fmt;
+use std::sync::Arc;
+
+use arrow_schema::SchemaRef;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+ SendableRecordBatchStream,
+};
+use datafusion_common::DataFusionError::Execution;
+use datafusion_common::stats::Precision;
+use datafusion_common::{ColumnStatistics, Result, Statistics};
+use futures::stream::TryStreamExt;
+use futures::{StreamExt, stream};
+
+use hudi_core::file_group::file_slice::FileSlice;
+use hudi_core::file_group::reader::FileGroupReader;
+use hudi_core::table::ReadOptions;
+
+/// DataFusion execution plan that reads Hudi file slices through
+/// [`FileGroupReader`].
+///
+/// Used for non-Parquet base file formats (Lance) and MOR snapshot
+/// queries where base + log file merging is required. Parquet-only
+/// COW and MOR read-optimized queries continue to use DataFusion's
+/// native `ParquetSource` path for row-group/page-level pruning.
+#[derive(Debug)]
+pub struct HudiScanExec {
+ file_slice_partitions: Vec<Vec<FileSlice>>,
+ file_group_reader: Arc<FileGroupReader>,
+ read_options: ReadOptions,
+ file_slice_read_concurrency: usize,
+ projected_schema: SchemaRef,
+ projection: Option<Vec<usize>>,
+ properties: PlanProperties,
+}
+
+impl HudiScanExec {
+ pub fn new(
+ file_slice_partitions: Vec<Vec<FileSlice>>,
+ file_group_reader: Arc<FileGroupReader>,
+ read_options: ReadOptions,
+ file_slice_read_concurrency: usize,
+ schema: SchemaRef,
+ projection: Option<Vec<usize>>,
+ ) -> Self {
+ let projected_schema = if let Some(ref proj) = projection {
+ let fields: Vec<_> = proj.iter().map(|&i|
schema.field(i).clone()).collect();
+ Arc::new(arrow_schema::Schema::new(fields))
+ } else {
+ schema.clone()
+ };
+
+ let partitions = if file_slice_partitions.is_empty() {
+ vec![vec![]]
+ } else {
+ file_slice_partitions
+ };
+ let n_partitions = partitions.len();
+
+ let properties = PlanProperties::new(
+
datafusion::physical_expr::EquivalenceProperties::new(projected_schema.clone()),
+ Partitioning::UnknownPartitioning(n_partitions),
+ EmissionType::Incremental,
+ Boundedness::Bounded,
+ );
+
+ Self {
+ file_slice_partitions: partitions,
+ file_group_reader,
+ read_options,
+ file_slice_read_concurrency: file_slice_read_concurrency.max(1),
+ projected_schema,
+ projection,
+ properties,
+ }
+ }
+}
+
+impl DisplayAs for HudiScanExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ let total_slices: usize = self.file_slice_partitions.iter().map(|p|
p.len()).sum();
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "HudiScanExec: partitions={}, file_slices={},
file_slice_read_concurrency={}, projection={:?}",
+ self.file_slice_partitions.len(),
+ total_slices,
+ self.file_slice_read_concurrency,
+ self.projection,
+ )
+ }
+ _ => {
+ write!(f, "HudiScanExec")
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for HudiScanExec {
+ fn name(&self) -> &str {
+ "HudiScanExec"
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn properties(&self) -> &PlanProperties {
+ &self.properties
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if children.is_empty() {
+ Ok(self)
+ } else {
+ Err(Execution(
+ "HudiScanExec is a leaf node and does not accept
children".to_string(),
+ ))
+ }
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ _context: Arc<datafusion::execution::TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ let file_slices = self
+ .file_slice_partitions
+ .get(partition)
+ .ok_or_else(|| {
+ Execution(format!(
+ "HudiScanExec partition {partition} out of range (have
{})",
+ self.file_slice_partitions.len()
+ ))
+ })?
+ .clone();
+
+ let projected_schema = self.projected_schema.clone();
+
+ if file_slices.is_empty() {
+ return Ok(Box::pin(RecordBatchStreamAdapter::new(
+ projected_schema,
+ futures::stream::empty(),
+ )));
+ }
+
+ let reader = self.file_group_reader.clone();
+ let options = self.read_options.clone();
+ let concurrency = self
+ .file_slice_read_concurrency
+ .min(file_slices.len())
+ .max(1);
+
+ let stream = stream::iter(file_slices)
+ .map(move |file_slice| {
+ let reader = reader.clone();
+ let options = options.clone();
+ async move {
+ let inner_stream = reader
+ .read_file_slice_stream(&file_slice, &options)
+ .await
+ .map_err(|e| {
+ datafusion_common::DataFusionError::Execution(format!(
+ "Failed to read file slice: {e}"
+ ))
+ })?;
+ Ok::<_,
datafusion_common::DataFusionError>(inner_stream.map_err(|e| {
+ datafusion_common::DataFusionError::Execution(format!(
+ "Failed to read batch: {e}"
+ ))
+ }))
+ }
+ })
+ .buffered(concurrency)
Review Comment:
`stream::iter(...).buffered(concurrency)` preserves input ordering, which
can cause head-of-line blocking (a slow first file slice prevents later slices
from starting to stream) even though the scan output is explicitly unordered.
To maximize parallelism, consider using `buffer_unordered(concurrency)` here
(and keep `try_flatten_unordered` / unordered flattening), since ordering is
not required.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]