This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 98a8bc0 feat: support column project and row filter pushdown (#510)
98a8bc0 is described below
commit 98a8bc0768893dc973a0776068eea01dd455d8ee
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jan 5 09:46:25 2026 -0600
feat: support column project and row filter pushdown (#510)
---
crates/core/src/file_group/reader.rs | 56 ++++-
crates/core/src/storage/error.rs | 3 +
crates/core/src/storage/mod.rs | 34 ++-
crates/core/src/table/mod.rs | 5 +-
crates/core/src/table/read_options.rs | 28 ++-
crates/core/tests/table_read_tests.rs | 159 ++++++++++++
crates/datafusion/Cargo.toml | 3 +
crates/datafusion/src/lib.rs | 457 ++++++++++++++++++----------------
crates/datafusion/src/util/expr.rs | 406 ++++++++++++++++++++++++++++--
crates/datafusion/tests/read_tests.rs | 404 ++++++++++++++++++++++++++++++
10 files changed, 1293 insertions(+), 262 deletions(-)
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 005b48f..8de9d86 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -373,10 +373,10 @@ impl FileGroupReader {
/// Reads a base file as a stream of record batches.
///
- /// # Limitations
- ///
- /// Currently only `batch_size` from [ReadOptions] is used. The
`projection` and
- /// `row_predicate` fields are not yet implemented.
+ /// Supports the following [ReadOptions]:
+ /// - `batch_size`: Controls the number of rows per batch
+ /// - `projection`: Pushes column selection to the parquet reader level
+ /// - `row_predicate`: Filters rows after reading each batch
async fn read_base_file_stream(
&self,
relative_path: &str,
@@ -400,10 +400,17 @@ impl FileGroupReader {
.get_or_default(HudiReadConfig::StreamBatchSize)
.into();
let batch_size = options.batch_size.unwrap_or(default_batch_size);
- let parquet_options =
ParquetReadOptions::new().with_batch_size(batch_size);
+ let mut parquet_options =
ParquetReadOptions::new().with_batch_size(batch_size);
+
+ // Add projection pushdown using column names (converted to indices
internally
+ // by get_parquet_file_stream using the same schema the projection is
applied to)
+ if let Some(ref projection_names) = options.projection {
+ parquet_options =
parquet_options.with_projection(projection_names.clone());
+ }
let hudi_configs = self.hudi_configs.clone();
let path = relative_path.to_string();
+ let row_predicate = options.row_predicate.clone();
let parquet_stream = self
.storage
@@ -411,19 +418,36 @@ impl FileGroupReader {
.map_err(|e| ReadFileSliceError(format!("Failed to read path
{path}: {e:?}")))
.await?;
- // Apply the same filtering logic as read_file_slice_by_base_file_path
+ // Apply filtering: commit time filter first, then row predicate
let stream = parquet_stream.into_stream().filter_map(move |result| {
let hudi_configs = hudi_configs.clone();
+ let row_predicate = row_predicate.clone();
async move {
match result {
Err(e) => Some(Err(ReadFileSliceError(format!(
"Failed to read batch: {e:?}"
)))),
- Ok(batch) => match apply_commit_time_filter(&hudi_configs,
batch) {
- Err(e) => Some(Err(e)),
- Ok(filtered) if filtered.num_rows() > 0 =>
Some(Ok(filtered)),
- Ok(_) => None,
- },
+ Ok(batch) => {
+ // Apply commit time filter
+ let filtered = match
apply_commit_time_filter(&hudi_configs, batch) {
+ Err(e) => return Some(Err(e)),
+ Ok(b) if b.num_rows() == 0 => return None,
+ Ok(b) => b,
+ };
+
+ // Apply row predicate if present
+ let final_batch = if let Some(ref predicate) =
row_predicate {
+ match apply_row_predicate(predicate.as_ref(),
filtered) {
+ Err(e) => return Some(Err(e)),
+ Ok(b) if b.num_rows() == 0 => return None,
+ Ok(b) => b,
+ }
+ } else {
+ filtered
+ };
+
+ Some(Ok(final_batch))
+ }
}
}
});
@@ -595,6 +619,16 @@ fn apply_commit_time_filter(hudi_configs: &HudiConfigs,
batch: RecordBatch) -> R
}
}
+/// Apply a row predicate to filter records in a batch.
+fn apply_row_predicate(
+ predicate: &(dyn Fn(&RecordBatch) -> Result<BooleanArray> + Send + Sync),
+ batch: RecordBatch,
+) -> Result<RecordBatch> {
+ let mask = predicate(&batch)?;
+ filter_record_batch(&batch, &mask)
+ .map_err(|e| ReadFileSliceError(format!("Failed to apply row
predicate: {e:?}")))
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/core/src/storage/error.rs b/crates/core/src/storage/error.rs
index a29dd6a..f6c28f0 100644
--- a/crates/core/src/storage/error.rs
+++ b/crates/core/src/storage/error.rs
@@ -31,6 +31,9 @@ pub enum StorageError {
#[error("Invalid path: {0}")]
InvalidPath(String),
+ #[error("Invalid column: {0}")]
+ InvalidColumn(String),
+
#[error(transparent)]
ObjectStoreError(#[from] object_store::Error),
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 14af978..e9650d9 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -54,8 +54,8 @@ pub mod util;
pub struct ParquetReadOptions {
/// Target batch size (number of rows per batch).
pub batch_size: Option<usize>,
- /// Column projection (indices of columns to read).
- pub projection: Option<Vec<usize>>,
+ /// Column projection by names.
+ pub projection: Option<Vec<String>>,
}
impl ParquetReadOptions {
@@ -68,8 +68,13 @@ impl ParquetReadOptions {
self
}
- pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
- self.projection = Some(projection);
+ /// Sets column projection by column names.
+ pub fn with_projection<I, S>(mut self, columns: I) -> Self
+ where
+ I: IntoIterator<Item = S>,
+ S: Into<String>,
+ {
+ self.projection = Some(columns.into_iter().map(|s|
s.into()).collect());
self
}
}
@@ -297,7 +302,26 @@ impl Storage {
builder = builder.with_batch_size(batch_size);
}
- if let Some(projection) = options.projection {
+ // Handle projection: convert column names to indices using builder's
schema
+ if let Some(ref column_names) = options.projection {
+ let arrow_schema = builder.schema();
+ let projection: Vec<usize> = column_names
+ .iter()
+ .map(|name| {
+ arrow_schema.index_of(name).map_err(|_| {
+ let available = arrow_schema
+ .fields()
+ .iter()
+ .map(|f| f.name().as_str())
+ .collect::<Vec<_>>()
+ .join(", ");
+ StorageError::InvalidColumn(format!(
+ "Column '{name}' not found in parquet file schema.
Available columns: [{available}]"
+ ))
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+
let projection_mask = parquet::arrow::ProjectionMask::roots(
builder.parquet_schema(),
projection.iter().copied(),
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index b85c2b5..f02fd3e 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -911,17 +911,18 @@ impl Table {
)])?;
// Extract options to pass to each file slice read.
- // Note: row_predicate is not yet supported in streaming base file
reads.
let batch_size = options.batch_size;
let projection = options.projection.clone();
+ let row_predicate = options.row_predicate.clone();
let streams_iter = file_slices.into_iter().map(move |file_slice| {
let fg_reader = fg_reader.clone();
let projection = projection.clone();
+ let row_predicate = row_predicate.clone();
let options = ReadOptions {
partition_filters: vec![],
projection,
- row_predicate: None, // Not yet implemented in streaming reads
+ row_predicate,
batch_size,
as_of_timestamp: None,
};
diff --git a/crates/core/src/table/read_options.rs
b/crates/core/src/table/read_options.rs
index 499546d..f2ca99d 100644
--- a/crates/core/src/table/read_options.rs
+++ b/crates/core/src/table/read_options.rs
@@ -18,10 +18,14 @@
*/
//! Read options for streaming reads.
+use std::sync::Arc;
+
use arrow_array::{BooleanArray, RecordBatch};
/// A row-level predicate function for filtering records.
-pub type RowPredicate = Box<dyn Fn(&RecordBatch) ->
crate::Result<BooleanArray> + Send + Sync>;
+///
+/// Uses `Arc` instead of `Box` to allow cloning for async streaming contexts.
+pub type RowPredicate = Arc<dyn Fn(&RecordBatch) ->
crate::Result<BooleanArray> + Send + Sync>;
/// A partition filter tuple: (field_name, operator, value).
/// Example: ("city", "=", "san_francisco")
@@ -36,15 +40,14 @@ pub type PartitionFilter = (String, String, String);
/// - Batch size control (rows per batch)
/// - Time travel (as-of timestamp)
///
-/// # Current Limitations
+/// # Streaming Support
///
-/// Not all options are fully supported in streaming APIs:
-/// - `batch_size` and `partition_filters` are fully supported.
-/// - `projection` is passed through to the streaming implementation but is
not yet
-/// applied at the parquet read level. This is because projection requires
mapping
-/// column names to column indices via schema lookup, which is not yet
implemented.
-/// - `row_predicate` is not yet implemented in streaming reads. The predicate
function
-/// is accepted but will be ignored.
+/// All options are supported in streaming APIs:
+/// - `batch_size` controls the number of rows per batch
+/// - `partition_filters` prunes partitions before reading
+/// - `projection` pushes column selection to the parquet reader level
+/// - `row_predicate` filters rows after reading each batch
+/// - `as_of_timestamp` enables time travel queries
///
/// # Example
///
@@ -53,6 +56,7 @@ pub type PartitionFilter = (String, String, String);
///
/// let options = ReadOptions::new()
/// .with_filters([("city", "=", "san_francisco")])
+/// .with_projection(["id", "name", "city"])
/// .with_batch_size(4096);
/// ```
#[derive(Default)]
@@ -112,8 +116,8 @@ impl ReadOptions {
/// Sets the row-level predicate for filtering records.
///
- /// **Note:** Row predicates are not yet implemented in streaming reads.
- /// The predicate will be accepted but ignored during streaming operations.
+ /// The predicate function receives each `RecordBatch` and returns a
`BooleanArray`
+ /// mask indicating which rows to keep. Rows where the mask is `true` are
retained.
///
/// # Arguments
/// * `predicate` - A function that takes a RecordBatch and returns a
BooleanArray mask
@@ -121,7 +125,7 @@ impl ReadOptions {
where
F: Fn(&RecordBatch) -> crate::Result<BooleanArray> + Send + Sync +
'static,
{
- self.row_predicate = Some(Box::new(predicate));
+ self.row_predicate = Some(Arc::new(predicate));
self
}
diff --git a/crates/core/tests/table_read_tests.rs
b/crates/core/tests/table_read_tests.rs
index 9b36a6d..fa899d7 100644
--- a/crates/core/tests/table_read_tests.rs
+++ b/crates/core/tests/table_read_tests.rs
@@ -762,6 +762,7 @@ mod v8_tables {
/// These tests verify the streaming versions of snapshot and file slice reads.
mod streaming_queries {
use super::*;
+ use arrow::record_batch::RecordBatch;
use futures::StreamExt;
use hudi_core::table::ReadOptions;
@@ -979,6 +980,164 @@ mod streaming_queries {
assert_eq!(count, 0, "Empty table should produce no batches");
Ok(())
}
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_projection() -> Result<()> {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Only request id and name columns (not isActive)
+ let options = ReadOptions::new().with_projection(["id", "name"]);
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce at least one batch");
+
+ // Verify only projected columns are returned
+ let schema = &batches[0].schema();
+ assert_eq!(schema.fields().len(), 2, "Should only have 2 columns");
+ assert!(
+ schema.field_with_name("id").is_ok(),
+ "Should have id column"
+ );
+ assert!(
+ schema.field_with_name("name").is_ok(),
+ "Should have name column"
+ );
+ assert!(
+ schema.field_with_name("isActive").is_err(),
+ "Should NOT have isActive column"
+ );
+
+ // Verify row count is still correct
+ let records = concat_batches(schema, &batches)?;
+ assert_eq!(records.num_rows(), 4, "Should have all 4 rows");
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_row_predicate() -> Result<()> {
+ use arrow::array::BooleanArray;
+
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Filter rows where isActive = true (Carol and Diana)
+ let options = ReadOptions::new().with_row_predicate(|batch:
&RecordBatch| {
+ let col = batch
+ .column_by_name("isActive")
+ .ok_or_else(|| hudi_core::error::CoreError::Schema("isActive
not found".into()))?;
+ let arr = col
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .ok_or_else(|| hudi_core::error::CoreError::Schema("Not
boolean".into()))?;
+ Ok(arr.clone())
+ });
+
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce at least one batch");
+
+ let schema = &batches[0].schema();
+ let records = concat_batches(schema, &batches)?;
+
+ // Should only have Carol and Diana (isActive = true)
+ let sample_data = SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(sample_data, vec![(3, "Carol", true), (4, "Diana", true)]);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_with_projection_and_row_predicate() ->
Result<()> {
+ use arrow::array::{BooleanArray, Int32Array};
+
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Project only id and isActive, filter where isActive = true
+ let options = ReadOptions::new()
+ .with_projection(["id", "isActive"])
+ .with_row_predicate(|batch: &RecordBatch| {
+ let col = batch.column_by_name("isActive").ok_or_else(|| {
+ hudi_core::error::CoreError::Schema("isActive not
found".into())
+ })?;
+ let arr = col
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .ok_or_else(|| hudi_core::error::CoreError::Schema("Not
boolean".into()))?;
+ Ok(arr.clone())
+ });
+
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ let mut batches = Vec::new();
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+
+ assert!(!batches.is_empty(), "Should produce at least one batch");
+
+ // Verify only projected columns
+ let schema = &batches[0].schema();
+ assert_eq!(schema.fields().len(), 2, "Should only have 2 columns");
+ assert!(schema.field_with_name("id").is_ok());
+ assert!(schema.field_with_name("isActive").is_ok());
+ assert!(schema.field_with_name("name").is_err());
+
+ // Verify filtered rows (only active users: Carol=3, Diana=4)
+ let records = concat_batches(schema, &batches)?;
+ assert_eq!(records.num_rows(), 2, "Should have 2 rows");
+
+ let ids = records
+ .column_by_name("id")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let id_values: Vec<i32> = ids.iter().flatten().collect();
+ assert!(id_values.contains(&3) && id_values.contains(&4));
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_snapshot_stream_projection_invalid_column() ->
Result<()> {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await?;
+
+ // Request a non-existent column
+ let options = ReadOptions::new().with_projection(["id",
"nonexistent_column"]);
+ let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+ // Error occurs when polling the stream (lazy evaluation)
+ let mut found_error = false;
+ while let Some(result) = stream.next().await {
+ match result {
+ Ok(_) => {}
+ Err(err) => {
+ assert!(
+ err.to_string().contains("nonexistent_column"),
+ "Error should mention the invalid column name, got:
{err}"
+ );
+ found_error = true;
+ break;
+ }
+ }
+ }
+ assert!(
+ found_error,
+ "Should have encountered an error for non-existent column"
+ );
+ Ok(())
+ }
}
/// Test module for tables with metadata table (MDT) enabled.
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index fadb942..be5b314 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -53,5 +53,8 @@ async-trait = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }
+# logging
+log = { workspace = true }
+
[dev-dependencies]
hudi-test = { path = "../test" }
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index f502d26..aeb4e7b 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -43,6 +43,7 @@ use datafusion_common::DataFusionError::Execution;
use datafusion_common::config::TableParquetOptions;
use datafusion_expr::{CreateExternalTable, Expr, TableProviderFilterPushDown,
TableType};
use datafusion_physical_expr::create_physical_expr;
+use log::warn;
use crate::util::expr::exprs_to_filters;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
@@ -79,9 +80,31 @@ use hudi_core::table::Table as HudiTable;
/// Ok(())
/// }
/// ```
-#[derive(Clone, Debug)]
+/// A DataFusion table provider for Apache Hudi tables.
+#[derive(Clone)]
pub struct HudiDataSource {
table: Arc<HudiTable>,
+ /// Cached partition schema for determining partition columns.
+ /// This is cached at construction since partition schema rarely changes
+ /// and is needed synchronously in `supports_filters_pushdown`.
+ partition_schema: Schema,
+}
+
+impl std::fmt::Debug for HudiDataSource {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("HudiDataSource")
+ .field("table", &self.table)
+ .field(
+ "partition_columns",
+ &self
+ .partition_schema
+ .fields()
+ .iter()
+ .map(|field| field.name())
+ .collect::<Vec<_>>(),
+ )
+ .finish()
+ }
}
impl HudiDataSource {
@@ -95,10 +118,23 @@ impl HudiDataSource {
K: AsRef<str>,
V: Into<String>,
{
- match HudiTable::new_with_options(base_uri, options).await {
- Ok(t) => Ok(Self { table: Arc::new(t) }),
- Err(e) => Err(Execution(format!("Failed to create Hudi table:
{e}"))),
- }
+ let table = HudiTable::new_with_options(base_uri, options)
+ .await
+ .map_err(|e| Execution(format!("Failed to create Hudi table:
{e}")))?;
+
+ // Cache partition schema at construction for use in
supports_filters_pushdown
+ let partition_schema = match table.get_partition_schema().await {
+ Ok(s) => s,
+ Err(e) => {
+ warn!("Failed to get partition schema, using empty schema:
{e}");
+ Schema::empty()
+ }
+ };
+
+ Ok(Self {
+ table: Arc::new(table),
+ partition_schema,
+ })
}
fn get_input_partitions(&self) -> usize {
@@ -110,21 +146,45 @@ impl HudiDataSource {
/// Check if the given expression can be pushed down to the Hudi table.
///
- /// The expression can be pushed down if it is a binary expression with a
supported operator and operands.
+ /// The expression can be pushed down if it is:
+ /// - A binary expression with a supported operator and operands
+ /// - A NOT expression wrapping a pushable expression
+ /// - An AND compound expression where at least one side can be pushed down
+ /// - A BETWEEN expression with column and literals
fn can_push_down(&self, expr: &Expr) -> bool {
match expr {
Expr::BinaryExpr(binary_expr) => {
let left = &binary_expr.left;
let op = &binary_expr.op;
let right = &binary_expr.right;
- self.is_supported_operator(op)
- && self.is_supported_operand(left)
- && self.is_supported_operand(right)
+
+ match op {
+ Operator::And => {
+ // AND is pushable if at least one side is pushable
+ self.can_push_down(left) || self.can_push_down(right)
+ }
+ Operator::Or => {
+ // OR cannot be pushed down with current filter model
+ false
+ }
+ _ => {
+ self.is_supported_operator(op)
+ && self.is_supported_operand(left)
+ && self.is_supported_operand(right)
+ }
+ }
}
Expr::Not(inner_expr) => {
// Recursively check if the inner expression can be pushed down
self.can_push_down(inner_expr)
}
+ Expr::Between(between) => {
+ // BETWEEN can be pushed if expr is a column and bounds are
literals
+ !between.negated
+ && matches!(&*between.expr, Expr::Column(_))
+ && matches!(&*between.low, Expr::Literal(..))
+ && matches!(&*between.high, Expr::Literal(..))
+ }
_ => false,
}
}
@@ -148,6 +208,55 @@ impl HudiDataSource {
_ => false,
}
}
+
+ /// Returns partition column names from partition schema.
+ fn get_partition_columns(&self) -> Vec<String> {
+ self.partition_schema
+ .fields()
+ .iter()
+ .map(|f| f.name().clone())
+ .collect()
+ }
+
+ /// Checks if expression filters on a partition column.
+ ///
+ /// Partition column filters can be marked as `Exact` because they are
+ /// fully handled by partition pruning and don't need post-filtering.
+ fn is_partition_column_filter(expr: &Expr, partition_cols: &[String]) ->
bool {
+ match expr {
+ Expr::BinaryExpr(binary_expr) => {
+ match binary_expr.op {
+ Operator::And => {
+ // For AND, check if both sides are partition filters
+ Self::is_partition_column_filter(&binary_expr.left,
partition_cols)
+ &&
Self::is_partition_column_filter(&binary_expr.right, partition_cols)
+ }
+ _ => {
+ // For partition filters, one side must be a partition
column
+ // and the other side must be a literal value
+ match (&*binary_expr.left, &*binary_expr.right) {
+ (Expr::Column(col), Expr::Literal(..))
+ if partition_cols.contains(&col.name) =>
+ {
+ true
+ }
+ (Expr::Literal(..), Expr::Column(col))
+ if partition_cols.contains(&col.name) =>
+ {
+ true
+ }
+ _ => false,
+ }
+ }
+ }
+ }
+ Expr::Not(inner) => Self::is_partition_column_filter(inner,
partition_cols),
+ Expr::Between(between) => {
+ matches!(&*between.expr, Expr::Column(col) if
partition_cols.contains(&col.name))
+ }
+ _ => false,
+ }
+ }
}
#[async_trait]
@@ -241,13 +350,20 @@ impl TableProvider for HudiDataSource {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
+ let partition_cols = self.get_partition_columns();
+
filters
.iter()
.map(|expr| {
- if self.can_push_down(expr) {
- Ok(TableProviderFilterPushDown::Inexact)
+ if !self.can_push_down(expr) {
+ return Ok(TableProviderFilterPushDown::Unsupported);
+ }
+
+ // Partition column filters are fully handled by partition
pruning
+ if Self::is_partition_column_filter(expr, &partition_cols) {
+ Ok(TableProviderFilterPushDown::Exact)
} else {
- Ok(TableProviderFilterPushDown::Unsupported)
+ Ok(TableProviderFilterPushDown::Inexact)
}
})
.collect()
@@ -337,28 +453,15 @@ impl TableProviderFactory for HudiTableFactory {
#[cfg(test)]
mod tests {
use super::*;
- use datafusion::execution::session_state::SessionStateBuilder;
- use datafusion::prelude::{SessionConfig, SessionContext};
- use datafusion_common::{Column, DataFusionError, ScalarValue};
+ use datafusion_common::{Column, ScalarValue};
use std::fs::canonicalize;
use std::path::Path;
- use std::sync::Arc;
use url::Url;
use datafusion::logical_expr::BinaryExpr;
- use hudi_core::config::read::HudiReadConfig::InputPartitions;
- use hudi_core::metadata::meta_field::MetaField;
- use hudi_test::SampleTable::{
- V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
- V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
- V6TimebasedkeygenNonhivestyle,
- };
- use hudi_test::assert_arrow_field_names_eq;
- use hudi_test::{SampleTable, util};
- use util::{get_bool_column, get_i32_column, get_str_column};
+ use hudi_test::SampleTable::{V6Nonpartitioned, V6SimplekeygenNonhivestyle};
use crate::HudiDataSource;
- use crate::HudiTableFactory;
#[tokio::test]
async fn get_default_input_partitions() {
@@ -369,196 +472,6 @@ mod tests {
assert_eq!(hudi.get_input_partitions(), 0)
}
- #[tokio::test]
- async fn test_get_create_schema_from_empty_table() {
- let table_provider =
- HudiDataSource::new_with_options(V6Empty.path_to_cow().as_str(),
empty_options())
- .await
- .unwrap();
- let schema = table_provider.schema();
- assert_arrow_field_names_eq!(
- schema,
- [MetaField::field_names(), vec!["id", "name", "isActive"]].concat()
- );
- }
-
- async fn register_test_table_with_session<I, K, V>(
- test_table: &SampleTable,
- options: I,
- use_sql: bool,
- ) -> Result<SessionContext, DataFusionError>
- where
- I: IntoIterator<Item = (K, V)>,
- K: AsRef<str>,
- V: Into<String>,
- {
- let ctx = create_test_session().await;
- if use_sql {
- let create_table_sql = format!(
- "CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}",
- test_table.as_ref(),
- test_table.path_to_cow(),
- concat_as_sql_options(options)
- );
- ctx.sql(create_table_sql.as_str()).await?;
- } else {
- let base_url = test_table.url_to_cow();
- let hudi = HudiDataSource::new_with_options(base_url.as_str(),
options).await?;
- ctx.register_table(test_table.as_ref(), Arc::new(hudi))?;
- }
- Ok(ctx)
- }
-
- async fn create_test_session() -> SessionContext {
- let config = SessionConfig::new().set(
- "datafusion.sql_parser.enable_ident_normalization",
- &ScalarValue::from(false),
- );
- let table_factory: Arc<dyn TableProviderFactory> =
Arc::new(HudiTableFactory::default());
-
- let session_state = SessionStateBuilder::new()
- .with_default_features()
- .with_config(config)
- .with_table_factories(HashMap::from([("HUDI".to_string(),
table_factory)]))
- .build();
-
- SessionContext::new_with_state(session_state)
- }
-
- fn concat_as_sql_options<I, K, V>(options: I) -> String
- where
- I: IntoIterator<Item = (K, V)>,
- K: AsRef<str>,
- V: Into<String>,
- {
- let kv_pairs: Vec<String> = options
- .into_iter()
- .map(|(k, v)| format!("'{}' '{}'", k.as_ref(), v.into()))
- .collect();
-
- if kv_pairs.is_empty() {
- String::new()
- } else {
- format!("OPTIONS ({})", kv_pairs.join(", "))
- }
- }
-
- #[tokio::test]
- async fn test_create_table_with_unknown_format() {
- let test_table = V6Nonpartitioned;
- let invalid_format = "UNKNOWN_FORMAT";
- let create_table_sql = format!(
- "CREATE EXTERNAL TABLE {} STORED AS {} LOCATION '{}'",
- test_table.as_ref(),
- invalid_format,
- test_table.path_to_cow()
- );
-
- let ctx = create_test_session().await;
- let result = ctx.sql(create_table_sql.as_str()).await;
- assert!(result.is_err());
- }
-
- async fn verify_plan(
- ctx: &SessionContext,
- sql: &str,
- table_name: &str,
- planned_input_partitioned: &i32,
- ) {
- let explaining_df = ctx.sql(sql).await.unwrap().explain(false,
true).unwrap();
- let explaining_rb = explaining_df.collect().await.unwrap();
- let explaining_rb = explaining_rb.first().unwrap();
- let plan = get_str_column(explaining_rb, "plan").join("");
- let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
- assert!(plan_lines[1].starts_with("SortExec: TopK(fetch=10)"));
- assert!(plan_lines[2].starts_with(&format!(
- "ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as
isActive, \
- get_field(structField@3, field2) as
{table_name}.structField[field2]]"
- )));
- assert!(plan_lines[4].starts_with(
- "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND name@1 != Alice AND
get_field(structField@3, field2) > 30"
- ));
-
assert!(plan_lines[5].contains(&format!("input_partitions={planned_input_partitioned}")));
- }
-
- async fn verify_data(ctx: &SessionContext, sql: &str, table_name: &str) {
- let df = ctx.sql(sql).await.unwrap();
- let rb = df.collect().await.unwrap();
- let rb = rb.first().unwrap();
- assert_eq!(get_i32_column(rb, "id"), &[2, 4]);
- assert_eq!(get_str_column(rb, "name"), &["Bob", "Diana"]);
- assert_eq!(get_bool_column(rb, "isActive"), &[false, true]);
- assert_eq!(
- get_i32_column(rb, &format!("{table_name}.structField[field2]")),
- &[40, 50]
- );
- }
-
- #[tokio::test]
- async fn test_datafusion_read_hudi_table_with_partition_filter_pushdown() {
- for (test_table, use_sql, planned_input_partitions) in &[
- (V6ComplexkeygenHivestyle, true, 2),
- (V6Nonpartitioned, true, 1),
- (V6SimplekeygenNonhivestyle, false, 2),
- (V6SimplekeygenHivestyleNoMetafields, true, 2),
- (V6TimebasedkeygenNonhivestyle, false, 2),
- ] {
- println!(">>> testing for {}", test_table.as_ref());
- let options = [(InputPartitions, "2")];
- let ctx = register_test_table_with_session(test_table, options,
*use_sql)
- .await
- .unwrap();
-
- let sql = format!(
- r#"
- SELECT id, name, isActive, structField.field2
- FROM {} WHERE id % 2 = 0 AND name != 'Alice'
- AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
- test_table.as_ref()
- );
-
- verify_plan(&ctx, &sql, test_table.as_ref(),
planned_input_partitions).await;
- verify_data(&ctx, &sql, test_table.as_ref()).await
- }
- }
-
- async fn verify_data_with_replacecommits(ctx: &SessionContext, sql: &str,
table_name: &str) {
- let df = ctx.sql(sql).await.unwrap();
- let rb = df.collect().await.unwrap();
- let rb = rb.first().unwrap();
- assert_eq!(get_i32_column(rb, "id"), &[4]);
- assert_eq!(get_str_column(rb, "name"), &["Diana"]);
- assert_eq!(get_bool_column(rb, "isActive"), &[false]);
- assert_eq!(
- get_i32_column(rb, &format!("{table_name}.structField[field2]")),
- &[50]
- );
- }
-
- #[tokio::test]
- async fn
test_datafusion_read_hudi_table_with_replacecommits_with_partition_filter_pushdown()
{
- for (test_table, use_sql, planned_input_partitions) in
- &[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)]
- {
- println!(">>> testing for {}", test_table.as_ref());
- let ctx =
- register_test_table_with_session(test_table,
[(InputPartitions, "2")], *use_sql)
- .await
- .unwrap();
-
- let sql = format!(
- r#"
- SELECT id, name, isActive, structField.field2
- FROM {} WHERE id % 2 = 0 AND name != 'Alice'
- AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
- test_table.as_ref()
- );
-
- verify_plan(&ctx, &sql, test_table.as_ref(),
planned_input_partitions).await;
- verify_data_with_replacecommits(&ctx, &sql,
test_table.as_ref()).await
- }
- }
-
#[tokio::test]
async fn test_supports_filters_pushdown() {
let table_provider = HudiDataSource::new_with_options(
@@ -612,6 +525,7 @@ mod tests {
let result =
table_provider.supports_filters_pushdown(&filters).unwrap();
assert_eq!(result.len(), 6);
+ // Non-partitioned table - all filters are Inexact (no partition
columns)
assert_eq!(result[0], TableProviderFilterPushDown::Inexact);
assert_eq!(result[1], TableProviderFilterPushDown::Inexact);
assert_eq!(result[2], TableProviderFilterPushDown::Unsupported);
@@ -619,4 +533,113 @@ mod tests {
assert_eq!(result[4], TableProviderFilterPushDown::Unsupported);
assert_eq!(result[5], TableProviderFilterPushDown::Inexact);
}
+
+ #[tokio::test]
+ async fn test_supports_filters_pushdown_exact_for_partition_columns() {
+ // Use a partitioned table - byteField is the partition column
+ let table_provider = HudiDataSource::new_with_options(
+ V6SimplekeygenNonhivestyle.path_to_cow().as_str(),
+ empty_options(),
+ )
+ .await
+ .unwrap();
+
+ // Filter on partition column (byteField) - should be Exact
+ let partition_filter = Expr::BinaryExpr(BinaryExpr {
+ left:
Box::new(Expr::Column(Column::from_name("byteField".to_string()))),
+ op: Operator::Eq,
+ right: Box::new(Expr::Literal(ScalarValue::Int8(Some(1)), None)),
+ });
+
+ // Filter on non-partition column (name) - should be Inexact
+ let non_partition_filter = Expr::BinaryExpr(BinaryExpr {
+ left:
Box::new(Expr::Column(Column::from_name("name".to_string()))),
+ op: Operator::Eq,
+ right: Box::new(Expr::Literal(
+ ScalarValue::Utf8(Some("Alice".to_string())),
+ None,
+ )),
+ });
+
+ let filters = vec![&partition_filter, &non_partition_filter];
+ let result =
table_provider.supports_filters_pushdown(&filters).unwrap();
+
+ assert_eq!(result.len(), 2);
+ // Partition column filter is Exact
+ assert_eq!(result[0], TableProviderFilterPushDown::Exact);
+ // Non-partition column filter is Inexact
+ assert_eq!(result[1], TableProviderFilterPushDown::Inexact);
+ }
+
+ #[tokio::test]
+ async fn test_supports_filters_pushdown_and_between() {
+ let table_provider = HudiDataSource::new_with_options(
+ V6Nonpartitioned.path_to_cow().as_str(),
+ empty_options(),
+ )
+ .await
+ .unwrap();
+
+ // AND expression: name = 'Alice' AND intField > 100
+ let left = Expr::BinaryExpr(BinaryExpr {
+ left:
Box::new(Expr::Column(Column::from_name("name".to_string()))),
+ op: Operator::Eq,
+ right: Box::new(Expr::Literal(
+ ScalarValue::Utf8(Some("Alice".to_string())),
+ None,
+ )),
+ });
+ let right = Expr::BinaryExpr(BinaryExpr {
+ left:
Box::new(Expr::Column(Column::from_name("intField".to_string()))),
+ op: Operator::Gt,
+ right: Box::new(Expr::Literal(ScalarValue::Int32(Some(100)),
None)),
+ });
+ let and_expr = Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(left),
+ op: Operator::And,
+ right: Box::new(right),
+ });
+
+ // BETWEEN expression: intField BETWEEN 10 AND 100
+ let between_expr = Expr::Between(datafusion_expr::Between::new(
+ Box::new(Expr::Column(Column::from_name("intField".to_string()))),
+ false,
+ Box::new(Expr::Literal(ScalarValue::Int32(Some(10)), None)),
+ Box::new(Expr::Literal(ScalarValue::Int32(Some(100)), None)),
+ ));
+
+ // OR expression - should be unsupported
+ let or_left = Expr::BinaryExpr(BinaryExpr {
+ left:
Box::new(Expr::Column(Column::from_name("name".to_string()))),
+ op: Operator::Eq,
+ right: Box::new(Expr::Literal(
+ ScalarValue::Utf8(Some("Alice".to_string())),
+ None,
+ )),
+ });
+ let or_right = Expr::BinaryExpr(BinaryExpr {
+ left:
Box::new(Expr::Column(Column::from_name("name".to_string()))),
+ op: Operator::Eq,
+ right: Box::new(Expr::Literal(
+ ScalarValue::Utf8(Some("Bob".to_string())),
+ None,
+ )),
+ });
+ let or_expr = Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(or_left),
+ op: Operator::Or,
+ right: Box::new(or_right),
+ });
+
+ let filters = vec![&and_expr, &between_expr, &or_expr];
+ let result =
table_provider.supports_filters_pushdown(&filters).unwrap();
+
+ assert_eq!(result.len(), 3);
+ // AND expression is supported
+ assert_eq!(result[0], TableProviderFilterPushDown::Inexact);
+ // BETWEEN expression is supported
+ assert_eq!(result[1], TableProviderFilterPushDown::Inexact);
+ // OR expression is not supported
+ assert_eq!(result[2], TableProviderFilterPushDown::Unsupported);
+ }
}
diff --git a/crates/datafusion/src/util/expr.rs
b/crates/datafusion/src/util/expr.rs
index 5f20216..0b68649 100644
--- a/crates/datafusion/src/util/expr.rs
+++ b/crates/datafusion/src/util/expr.rs
@@ -18,34 +18,83 @@
*/
use datafusion::logical_expr::Operator;
-use datafusion_expr::{BinaryExpr, Expr};
+use datafusion_expr::{Between, BinaryExpr, Expr};
use hudi_core::expr::filter::{Filter as HudiFilter, col};
+use log::{debug, warn};
-/// Converts DataFusion expressions into Hudi filters.
+/// Extracts pushdown-safe filters from DataFusion expressions for partition
pruning.
///
-/// Takes a slice of DataFusion [`Expr`] and attempts to convert each
expression
-/// into a [`HudiFilter`]. Only binary expressions and NOT expressions are
currently supported.
+/// Takes a slice of DataFusion [`Expr`] and extracts filters that can be
safely
+/// pushed down for partition pruning. The returned filters represent a
**subset**
+/// of the original expression's constraints.
+///
+/// # Supported Expressions
+/// - Binary comparisons: `=`, `!=`, `<`, `>`, `<=`, `>=`
+/// - `NOT` expressions: negates inner binary expression
+/// - `AND` compound expressions: recursively flattens both sides
+/// - `BETWEEN` expressions: converts to `>= low AND <= high`
+///
+/// # OR Expression Handling
+///
+/// `OR` expressions cannot be represented in the current filter model and are
+/// **skipped**. This means expressions containing `OR` will be **partially
extracted**:
+///
+/// | Input Expression | Extracted Filters | Notes
|
+///
|-----------------------|-------------------|--------------------------------|
+/// | `A AND B` | `[A, B]` | Full extraction
|
+/// | `A OR B` | `[]` | OR skipped entirely
|
+/// | `A AND (B OR C)` | `[A]` | Only A extracted, OR skipped
|
+/// | `(A OR B) AND C` | `[C]` | Only C extracted, OR skipped
|
+///
+/// # Safety
+///
+/// This function is **safe for partition pruning** because:
+/// - Extracted filters are a weaker constraint (may match more rows than
original)
+/// - Partitions that don't match extracted filters definitely don't match
original
+/// - The original expression must still be applied to filter actual row data
+///
+/// **Callers must still apply the original expression for correctness.**
+/// The extracted filters are for optimization (pruning), not semantic
equivalence.
///
/// # Arguments
/// * `exprs` - A slice of DataFusion expressions to convert
///
/// # Returns
-/// Returns `Some(Vec<HudiFilter>)` if at least one filter is successfully
converted,
-/// otherwise returns `None`.
-///
-/// TODO: Handle other DataFusion [`Expr`]
+/// A vector of filter tuples `(field_name, operator, value)`. All returned
filters
+/// are implicitly AND-ed together.
pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<(String, String, String)> {
exprs
.iter()
- .filter_map(|expr| match expr {
- Expr::BinaryExpr(binary_expr) =>
binary_expr_to_filter(binary_expr),
- Expr::Not(not_expr) => not_expr_to_filter(not_expr),
- _ => None,
- })
+ .flat_map(expr_to_filters)
.map(|filter| filter.into())
.collect()
}
+/// Recursively extracts pushdown-safe filters from a single expression.
+///
+/// OR expressions return empty (cannot be pushed down), which may result in
+/// partial extraction when OR is nested within AND expressions.
+fn expr_to_filters(expr: &Expr) -> Vec<HudiFilter> {
+ match expr {
+ Expr::BinaryExpr(binary_expr) => match binary_expr.op {
+ Operator::And => {
+ // Recursively flatten AND expressions
+ let mut filters = expr_to_filters(&binary_expr.left);
+ filters.extend(expr_to_filters(&binary_expr.right));
+ filters
+ }
+ Operator::Or => {
+ // Cannot represent OR in current filter model - skip
+ vec![]
+ }
+ _ => binary_expr_to_filter(binary_expr).into_iter().collect(),
+ },
+ Expr::Not(not_expr) =>
not_expr_to_filter(not_expr).into_iter().collect(),
+ Expr::Between(between) => between_to_filters(between),
+ _ => vec![],
+ }
+}
+
/// Converts a binary expression [`Expr::BinaryExpr`] into a [`HudiFilter`].
fn binary_expr_to_filter(binary_expr: &BinaryExpr) -> Option<HudiFilter> {
// extract the column and literal from the binary expression
@@ -81,6 +130,52 @@ fn not_expr_to_filter(not_expr: &Expr) ->
Option<HudiFilter> {
}
}
+/// Converts a BETWEEN expression into two filters: >= low AND <= high.
+///
+/// If `negated` is true, returns empty (NOT BETWEEN is complex to represent).
+fn between_to_filters(between: &Between) -> Vec<HudiFilter> {
+ if between.negated {
+ debug!("NOT BETWEEN expressions cannot be pushed down");
+ return vec![];
+ }
+
+ // Extract column name from the expression
+ let column_name = match &*between.expr {
+ Expr::Column(col) => col.name.clone(),
+ _ => {
+ debug!("BETWEEN with non-column expression cannot be pushed down");
+ return vec![];
+ }
+ };
+
+ // Extract literal values from low and high bounds
+ let low_str = match &*between.low {
+ Expr::Literal(lit, _) => lit.to_string(),
+ _ => {
+ warn!(
+ "BETWEEN low bound is not a literal for column
'{column_name}', skipping pushdown"
+ );
+ return vec![];
+ }
+ };
+
+ let high_str = match &*between.high {
+ Expr::Literal(lit, _) => lit.to_string(),
+ _ => {
+ warn!(
+ "BETWEEN high bound is not a literal for column
'{column_name}', skipping pushdown"
+ );
+ return vec![];
+ }
+ };
+
+ // Create two filters: >= low AND <= high
+ vec![
+ col(&column_name).gte(low_str),
+ col(&column_name).lte(high_str),
+ ]
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -256,10 +351,11 @@ mod tests {
#[test]
fn test_convert_expr_with_unsupported_operator() {
+ // Modulo operator is not supported
let expr = Expr::BinaryExpr(BinaryExpr::new(
Box::new(col("col")),
- Operator::And,
- Box::new(lit("value")),
+ Operator::Modulo,
+ Box::new(lit(2i32)),
));
let filters = vec![expr];
@@ -267,6 +363,94 @@ mod tests {
assert!(result.is_empty());
}
+ #[test]
+ fn test_convert_and_compound_expr() {
+ // Test: col1 = 'a' AND col2 = 'b' should produce two filters
+ let left = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(col("col1")),
+ Operator::Eq,
+ Box::new(lit("a")),
+ ));
+ let right = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(col("col2")),
+ Operator::Eq,
+ Box::new(lit("b")),
+ ));
+ let and_expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(left),
+ Operator::And,
+ Box::new(right),
+ ));
+
+ let result = exprs_to_filters(&[and_expr]);
+
+ assert_eq!(result.len(), 2);
+ assert_eq!(result[0].0, "col1");
+ assert_eq!(result[0].1, "=");
+ assert_eq!(result[0].2, "a");
+ assert_eq!(result[1].0, "col2");
+ assert_eq!(result[1].1, "=");
+ assert_eq!(result[1].2, "b");
+ }
+
+ #[test]
+ fn test_convert_or_expr_returns_empty() {
+ // OR expressions cannot be pushed down
+ let left = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(col("col1")),
+ Operator::Eq,
+ Box::new(lit("a")),
+ ));
+ let right = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(col("col2")),
+ Operator::Eq,
+ Box::new(lit("b")),
+ ));
+ let or_expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(left),
+ Operator::Or,
+ Box::new(right),
+ ));
+
+ let result = exprs_to_filters(&[or_expr]);
+ assert!(result.is_empty());
+ }
+
+ #[test]
+ fn test_convert_between_expr() {
+ // Test: col BETWEEN 10 AND 20 should produce >= 10 AND <= 20
+ let between = Expr::Between(Between::new(
+ Box::new(col("count")),
+ false,
+ Box::new(lit(10i32)),
+ Box::new(lit(20i32)),
+ ));
+
+ let result = exprs_to_filters(&[between]);
+
+ assert_eq!(result.len(), 2);
+ assert_eq!(result[0].0, "count");
+ assert_eq!(result[0].1, ">=");
+ assert_eq!(result[0].2, "10");
+ assert_eq!(result[1].0, "count");
+ assert_eq!(result[1].1, "<=");
+ assert_eq!(result[1].2, "20");
+ }
+
+ #[test]
+ fn test_convert_not_between_returns_empty() {
+ // NOT BETWEEN cannot be represented in current filter model
+ let not_between = Expr::Between(Between::new(
+ Box::new(col("count")),
+ true, // negated
+ Box::new(lit(10i32)),
+ Box::new(lit(20i32)),
+ ));
+
+ let result = exprs_to_filters(&[not_between]);
+ assert!(result.is_empty());
+ }
+
#[test]
fn test_negate_operator_for_all_ops() {
for (op, _) in ExprOperator::TOKEN_OP_PAIRS {
@@ -279,4 +463,196 @@ mod tests {
}
}
}
+
+ //
=========================================================================
+ // Partial extraction tests for OR expressions
+ //
=========================================================================
+ //
+ // These tests verify the documented behavior: OR expressions cannot be
+ // pushed down, so expressions containing OR are partially extracted.
+ // This is safe for partition pruning (extracted filters are weaker
+ // constraints) but callers must apply original expressions for
correctness.
+
+ #[test]
+ fn test_partial_extraction_and_with_or_on_right() {
+ // Test: A AND (B OR C) should extract only [A]
+ // The OR subtree is skipped, leaving only the left AND operand
+ let a = col("col_a").eq(lit("a"));
+ let b = col("col_b").eq(lit("b"));
+ let c = col("col_c").eq(lit("c"));
+
+ // Build: (B OR C)
+ let b_or_c = Expr::BinaryExpr(BinaryExpr::new(Box::new(b),
Operator::Or, Box::new(c)));
+
+ // Build: A AND (B OR C)
+ let expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(a),
+ Operator::And,
+ Box::new(b_or_c),
+ ));
+
+ let result = exprs_to_filters(&[expr]);
+
+ // Only A should be extracted; (B OR C) is skipped
+ assert_eq!(
+ result.len(),
+ 1,
+ "Expected only 1 filter (A), OR subtree skipped"
+ );
+ assert_eq!(result[0].0, "col_a");
+ assert_eq!(result[0].1, "=");
+ assert_eq!(result[0].2, "a");
+ }
+
+ #[test]
+ fn test_partial_extraction_and_with_or_on_left() {
+ // Test: (A OR B) AND C should extract only [C]
+ // The OR subtree is skipped, leaving only the right AND operand
+ let a = col("col_a").eq(lit("a"));
+ let b = col("col_b").eq(lit("b"));
+ let c = col("col_c").eq(lit("c"));
+
+ // Build: (A OR B)
+ let a_or_b = Expr::BinaryExpr(BinaryExpr::new(Box::new(a),
Operator::Or, Box::new(b)));
+
+ // Build: (A OR B) AND C
+ let expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(a_or_b),
+ Operator::And,
+ Box::new(c),
+ ));
+
+ let result = exprs_to_filters(&[expr]);
+
+ // Only C should be extracted; (A OR B) is skipped
+ assert_eq!(
+ result.len(),
+ 1,
+ "Expected only 1 filter (C), OR subtree skipped"
+ );
+ assert_eq!(result[0].0, "col_c");
+ assert_eq!(result[0].1, "=");
+ assert_eq!(result[0].2, "c");
+ }
+
+ #[test]
+ fn test_partial_extraction_complex_and_or_mix() {
+ // Test: (A AND B) AND (C OR D) should extract [A, B]
+ // The left AND subtree is fully extracted, right OR subtree is skipped
+ let a = col("col_a").eq(lit("a"));
+ let b = col("col_b").eq(lit("b"));
+ let c = col("col_c").eq(lit("c"));
+ let d = col("col_d").eq(lit("d"));
+
+ // Build: (A AND B)
+ let a_and_b = Expr::BinaryExpr(BinaryExpr::new(Box::new(a),
Operator::And, Box::new(b)));
+
+ // Build: (C OR D)
+ let c_or_d = Expr::BinaryExpr(BinaryExpr::new(Box::new(c),
Operator::Or, Box::new(d)));
+
+ // Build: (A AND B) AND (C OR D)
+ let expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(a_and_b),
+ Operator::And,
+ Box::new(c_or_d),
+ ));
+
+ let result = exprs_to_filters(&[expr]);
+
+ // A and B should be extracted; (C OR D) is skipped
+ assert_eq!(
+ result.len(),
+ 2,
+ "Expected 2 filters (A, B), OR subtree skipped"
+ );
+ assert_eq!(result[0].0, "col_a");
+ assert_eq!(result[1].0, "col_b");
+ }
+
+ #[test]
+ fn test_partial_extraction_or_both_sides_skipped() {
+ // Test: (A OR B) AND (C OR D) should extract []
+ // Both sides are OR, so nothing can be extracted
+ let a = col("col_a").eq(lit("a"));
+ let b = col("col_b").eq(lit("b"));
+ let c = col("col_c").eq(lit("c"));
+ let d = col("col_d").eq(lit("d"));
+
+ let a_or_b = Expr::BinaryExpr(BinaryExpr::new(Box::new(a),
Operator::Or, Box::new(b)));
+
+ let c_or_d = Expr::BinaryExpr(BinaryExpr::new(Box::new(c),
Operator::Or, Box::new(d)));
+
+ let expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(a_or_b),
+ Operator::And,
+ Box::new(c_or_d),
+ ));
+
+ let result = exprs_to_filters(&[expr]);
+
+ // Both sides are OR, nothing extracted
+ assert!(
+ result.is_empty(),
+ "Expected empty result when both AND operands are OR"
+ );
+ }
+
+ #[test]
+ fn test_partial_extraction_deeply_nested() {
+ // Test: A AND (B AND (C OR D)) should extract [A, B]
+ // Nested AND is flattened, nested OR is skipped
+ let a = col("col_a").eq(lit("a"));
+ let b = col("col_b").eq(lit("b"));
+ let c = col("col_c").eq(lit("c"));
+ let d = col("col_d").eq(lit("d"));
+
+ // Build: (C OR D)
+ let c_or_d = Expr::BinaryExpr(BinaryExpr::new(Box::new(c),
Operator::Or, Box::new(d)));
+
+ // Build: B AND (C OR D)
+ let b_and_c_or_d = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(b),
+ Operator::And,
+ Box::new(c_or_d),
+ ));
+
+ // Build: A AND (B AND (C OR D))
+ let expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(a),
+ Operator::And,
+ Box::new(b_and_c_or_d),
+ ));
+
+ let result = exprs_to_filters(&[expr]);
+
+ // A and B should be extracted from nested ANDs; (C OR D) is skipped
+ assert_eq!(
+ result.len(),
+ 2,
+ "Expected 2 filters (A, B) from nested ANDs"
+ );
+ assert_eq!(result[0].0, "col_a");
+ assert_eq!(result[1].0, "col_b");
+ }
+
+ #[test]
+ fn test_partial_extraction_multiple_input_exprs() {
+ // Test: Multiple expressions in input slice
+ // Input: [A, (B OR C)] should extract [A] (B OR C skipped)
+ let a = col("col_a").eq(lit("a"));
+ let b = col("col_b").eq(lit("b"));
+ let c = col("col_c").eq(lit("c"));
+
+ let b_or_c = Expr::BinaryExpr(BinaryExpr::new(Box::new(b),
Operator::Or, Box::new(c)));
+
+ let result = exprs_to_filters(&[a, b_or_c]);
+
+ // Only A from first expr; second expr (B OR C) is skipped entirely
+ assert_eq!(
+ result.len(),
+ 1,
+ "Expected 1 filter from first expr, OR expr skipped"
+ );
+ assert_eq!(result[0].0, "col_a");
+ }
}
diff --git a/crates/datafusion/tests/read_tests.rs
b/crates/datafusion/tests/read_tests.rs
new file mode 100644
index 0000000..5886c4f
--- /dev/null
+++ b/crates/datafusion/tests/read_tests.rs
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! E2E tests for DataFusion integration with Hudi tables.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion::catalog::TableProviderFactory;
+use datafusion::datasource::TableProvider;
+use datafusion::error::Result;
+use datafusion::execution::session_state::SessionStateBuilder;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_common::{DataFusionError, ScalarValue};
+
+use hudi_core::config::read::HudiReadConfig::InputPartitions;
+use hudi_core::config::util::empty_options;
+use hudi_core::metadata::meta_field::MetaField;
+use hudi_datafusion::{HudiDataSource, HudiTableFactory};
+use hudi_test::util::{get_bool_column, get_i32_column, get_str_column};
+use hudi_test::{SampleTable, assert_arrow_field_names_eq};
+
+// ============================================================================
+// Helper Functions
+// ============================================================================
+
+async fn create_test_session() -> SessionContext {
+ let config = SessionConfig::new().set(
+ "datafusion.sql_parser.enable_ident_normalization",
+ &ScalarValue::from(false),
+ );
+ let table_factory: Arc<dyn TableProviderFactory> =
Arc::new(HudiTableFactory::default());
+
+ let session_state = SessionStateBuilder::new()
+ .with_default_features()
+ .with_config(config)
+ .with_table_factories(HashMap::from([("HUDI".to_string(),
table_factory)]))
+ .build();
+
+ SessionContext::new_with_state(session_state)
+}
+
+async fn register_test_table_with_session<I, K, V>(
+ test_table: &SampleTable,
+ options: I,
+ use_sql: bool,
+) -> Result<SessionContext, DataFusionError>
+where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+{
+ let ctx = create_test_session().await;
+ if use_sql {
+ let create_table_sql = format!(
+ "CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}",
+ test_table.as_ref(),
+ test_table.path_to_cow(),
+ concat_as_sql_options(options)
+ );
+ ctx.sql(create_table_sql.as_str()).await?;
+ } else {
+ let base_url = test_table.url_to_cow();
+ let hudi = HudiDataSource::new_with_options(base_url.as_str(),
options).await?;
+ ctx.register_table(test_table.as_ref(), Arc::new(hudi))?;
+ }
+ Ok(ctx)
+}
+
+/// Register a table with the given session using direct registration (not
SQL).
+async fn register_table_direct<I, K, V>(
+ test_table: &SampleTable,
+ options: I,
+) -> Result<SessionContext, DataFusionError>
+where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+{
+ let ctx = create_test_session().await;
+ let base_url = test_table.url_to_cow();
+ let hudi = HudiDataSource::new_with_options(base_url.as_str(),
options).await?;
+ ctx.register_table(test_table.as_ref(), Arc::new(hudi))?;
+ Ok(ctx)
+}
+
+fn concat_as_sql_options<I, K, V>(options: I) -> String
+where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+{
+ let kv_pairs: Vec<String> = options
+ .into_iter()
+ .map(|(k, v)| format!("'{}' '{}'", k.as_ref(), v.into()))
+ .collect();
+
+ if kv_pairs.is_empty() {
+ String::new()
+ } else {
+ format!("OPTIONS ({})", kv_pairs.join(", "))
+ }
+}
+
+async fn verify_plan(
+ ctx: &SessionContext,
+ sql: &str,
+ table_name: &str,
+ planned_input_partitioned: &i32,
+) {
+ let explaining_df = ctx.sql(sql).await.unwrap().explain(false,
true).unwrap();
+ let explaining_rb = explaining_df.collect().await.unwrap();
+ let explaining_rb = explaining_rb.first().unwrap();
+ let plan = get_str_column(explaining_rb, "plan").join("");
+ let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
+ assert!(plan_lines[1].starts_with("SortExec: TopK(fetch=10)"));
+ assert!(plan_lines[2].starts_with(&format!(
+ "ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as
isActive, \
+ get_field(structField@3, field2) as {table_name}.structField[field2]]"
+ )));
+ assert!(plan_lines[4].starts_with(
+ "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND name@1 != Alice AND
get_field(structField@3, field2) > 30"
+ ));
+
assert!(plan_lines[5].contains(&format!("input_partitions={planned_input_partitioned}")));
+}
+
+async fn verify_data(ctx: &SessionContext, sql: &str, table_name: &str) {
+ let df = ctx.sql(sql).await.unwrap();
+ let rb = df.collect().await.unwrap();
+ let rb = rb.first().unwrap();
+ assert_eq!(get_i32_column(rb, "id"), &[2, 4]);
+ assert_eq!(get_str_column(rb, "name"), &["Bob", "Diana"]);
+ assert_eq!(get_bool_column(rb, "isActive"), &[false, true]);
+ assert_eq!(
+ get_i32_column(rb, &format!("{table_name}.structField[field2]")),
+ &[40, 50]
+ );
+}
+
+async fn verify_data_with_replacecommits(ctx: &SessionContext, sql: &str,
table_name: &str) {
+ let df = ctx.sql(sql).await.unwrap();
+ let rb = df.collect().await.unwrap();
+ let rb = rb.first().unwrap();
+ assert_eq!(get_i32_column(rb, "id"), &[4]);
+ assert_eq!(get_str_column(rb, "name"), &["Diana"]);
+ assert_eq!(get_bool_column(rb, "isActive"), &[false]);
+ assert_eq!(
+ get_i32_column(rb, &format!("{table_name}.structField[field2]")),
+ &[50]
+ );
+}
+
+// ============================================================================
+// V6 Table Tests (moved from lib.rs)
+// ============================================================================
+
+mod v6_tests {
+ use super::*;
+ use hudi_test::SampleTable::{
+ V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
+ V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
+ V6TimebasedkeygenNonhivestyle,
+ };
+
+ #[tokio::test]
+ async fn test_get_create_schema_from_empty_table() {
+ let table_provider =
+ HudiDataSource::new_with_options(V6Empty.path_to_cow().as_str(),
empty_options())
+ .await
+ .unwrap();
+ let schema = table_provider.schema();
+ assert_arrow_field_names_eq!(
+ schema,
+ [MetaField::field_names(), vec!["id", "name", "isActive"]].concat()
+ );
+ }
+
+ #[tokio::test]
+ async fn test_create_table_with_unknown_format() {
+ let test_table = V6Nonpartitioned;
+ let invalid_format = "UNKNOWN_FORMAT";
+ let create_table_sql = format!(
+ "CREATE EXTERNAL TABLE {} STORED AS {} LOCATION '{}'",
+ test_table.as_ref(),
+ invalid_format,
+ test_table.path_to_cow()
+ );
+
+ let ctx = create_test_session().await;
+ let result = ctx.sql(create_table_sql.as_str()).await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_datafusion_read_hudi_table_with_partition_filter_pushdown() {
+ for (test_table, use_sql, planned_input_partitions) in &[
+ (V6ComplexkeygenHivestyle, true, 2),
+ (V6Nonpartitioned, true, 1),
+ (V6SimplekeygenNonhivestyle, false, 2),
+ (V6SimplekeygenHivestyleNoMetafields, true, 2),
+ (V6TimebasedkeygenNonhivestyle, false, 2),
+ ] {
+ println!(">>> testing for {}", test_table.as_ref());
+ let options = [(InputPartitions, "2")];
+ let ctx = register_test_table_with_session(test_table, options,
*use_sql)
+ .await
+ .unwrap();
+
+ let sql = format!(
+ r#"
+ SELECT id, name, isActive, structField.field2
+ FROM {} WHERE id % 2 = 0 AND name != 'Alice'
+ AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
+ test_table.as_ref()
+ );
+
+ verify_plan(&ctx, &sql, test_table.as_ref(),
planned_input_partitions).await;
+ verify_data(&ctx, &sql, test_table.as_ref()).await
+ }
+ }
+
+ #[tokio::test]
+ async fn
test_datafusion_read_hudi_table_with_replacecommits_with_partition_filter_pushdown()
{
+ for (test_table, use_sql, planned_input_partitions) in
+ &[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)]
+ {
+ println!(">>> testing for {}", test_table.as_ref());
+ let ctx =
+ register_test_table_with_session(test_table,
[(InputPartitions, "2")], *use_sql)
+ .await
+ .unwrap();
+
+ let sql = format!(
+ r#"
+ SELECT id, name, isActive, structField.field2
+ FROM {} WHERE id % 2 = 0 AND name != 'Alice'
+ AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
+ test_table.as_ref()
+ );
+
+ verify_plan(&ctx, &sql, test_table.as_ref(),
planned_input_partitions).await;
+ verify_data_with_replacecommits(&ctx, &sql,
test_table.as_ref()).await
+ }
+ }
+}
+
+// ============================================================================
+// V8 Table Tests (new)
+// ============================================================================
+
+mod v8_tests {
+ use super::*;
+ use hudi_test::SampleTable::{
+ V8ComplexkeygenHivestyle, V8Nonpartitioned, V8SimplekeygenNonhivestyle,
+ };
+
+ #[tokio::test]
+ async fn test_v8_nonpartitioned_read() {
+ let test_table = V8Nonpartitioned;
+ println!(">>> testing V8 for {}", test_table.as_ref());
+
+ let ctx = register_table_direct(&test_table, [(InputPartitions, "2")])
+ .await
+ .unwrap();
+
+ // Verify schema
+ let df = ctx
+ .sql(&format!("SELECT * FROM {} LIMIT 1", test_table.as_ref()))
+ .await
+ .unwrap();
+ let schema = df.schema();
+ // V8 tables should have the expected columns
+ assert!(schema.field_with_name(None, "id").is_ok());
+ assert!(schema.field_with_name(None, "name").is_ok());
+ assert!(schema.field_with_name(None, "isActive").is_ok());
+
+ // Verify data read with filters
+ let sql = format!(
+ r#"SELECT id, name, isActive FROM {} WHERE id > 0 ORDER BY id"#,
+ test_table.as_ref()
+ );
+ let df = ctx.sql(&sql).await.unwrap();
+ let rb = df.collect().await.unwrap();
+ assert!(!rb.is_empty(), "Should return data from V8 table");
+
+ // Verify plan includes DataSourceExec
+ let explaining_df = ctx.sql(&sql).await.unwrap().explain(false,
true).unwrap();
+ let explaining_rb = explaining_df.collect().await.unwrap();
+ let explaining_rb = explaining_rb.first().unwrap();
+ let plan = get_str_column(explaining_rb, "plan").join("");
+ assert!(
+ plan.contains("DataSourceExec"),
+ "Plan should contain DataSourceExec"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_v8_partitioned_filter_pushdown() {
+ let test_table = V8SimplekeygenNonhivestyle;
+ println!(">>> testing V8 for {}", test_table.as_ref());
+
+ let ctx = register_table_direct(&test_table, [(InputPartitions, "2")])
+ .await
+ .unwrap();
+
+ let sql = format!(
+ r#"
+ SELECT id, name, isActive, structField.field2
+ FROM {} WHERE id % 2 = 0 AND name != 'Alice'
+ AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
+ test_table.as_ref()
+ );
+
+ // Verify plan
+ let explaining_df = ctx.sql(&sql).await.unwrap().explain(false,
true).unwrap();
+ let explaining_rb = explaining_df.collect().await.unwrap();
+ let explaining_rb = explaining_rb.first().unwrap();
+ let plan = get_str_column(explaining_rb, "plan").join("");
+ let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
+
+ // Verify execution plan structure
+ assert!(
+ plan_lines[1].starts_with("SortExec: TopK(fetch=10)"),
+ "Should have TopK sort"
+ );
+ assert!(
+ plan_lines[2].contains("ProjectionExec"),
+ "Should have ProjectionExec"
+ );
+ assert!(
+ plan.contains("FilterExec"),
+ "Should have FilterExec for non-partition filters"
+ );
+ assert!(
+ plan.contains("input_partitions=2"),
+ "Should have input_partitions=2"
+ );
+
+ // Verify data
+ let df = ctx.sql(&sql).await.unwrap();
+ let rb = df.collect().await.unwrap();
+ let rb = rb.first().unwrap();
+ assert_eq!(get_i32_column(rb, "id"), &[2, 4]);
+ assert_eq!(get_str_column(rb, "name"), &["Bob", "Diana"]);
+ assert_eq!(get_bool_column(rb, "isActive"), &[false, true]);
+ }
+
+ #[tokio::test]
+ async fn test_v8_complex_keygen() {
+ let test_table = V8ComplexkeygenHivestyle;
+ println!(">>> testing V8 for {}", test_table.as_ref());
+
+ let ctx = register_table_direct(&test_table, [(InputPartitions, "2")])
+ .await
+ .unwrap();
+
+ let sql = format!(
+ r#"
+ SELECT id, name, isActive, structField.field2
+ FROM {} WHERE id % 2 = 0 AND name != 'Alice'
+ AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
+ test_table.as_ref()
+ );
+
+ // Verify plan has correct input partitions for complex keygen
+ let explaining_df = ctx.sql(&sql).await.unwrap().explain(false,
true).unwrap();
+ let explaining_rb = explaining_df.collect().await.unwrap();
+ let explaining_rb = explaining_rb.first().unwrap();
+ let plan = get_str_column(explaining_rb, "plan").join("");
+
+ assert!(
+ plan.contains("input_partitions=2"),
+ "Complex keygen table should have input_partitions=2"
+ );
+ assert!(
+ plan.contains("DataSourceExec"),
+ "Plan should contain DataSourceExec"
+ );
+
+ // Verify data
+ let df = ctx.sql(&sql).await.unwrap();
+ let rb = df.collect().await.unwrap();
+ let rb = rb.first().unwrap();
+ assert_eq!(get_i32_column(rb, "id"), &[2, 4]);
+ assert_eq!(get_str_column(rb, "name"), &["Bob", "Diana"]);
+ assert_eq!(get_bool_column(rb, "isActive"), &[false, true]);
+ }
+}