This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 98a8bc0  feat: support column project and row filter pushdown (#510)
98a8bc0 is described below

commit 98a8bc0768893dc973a0776068eea01dd455d8ee
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jan 5 09:46:25 2026 -0600

    feat: support column project and row filter pushdown (#510)
---
 crates/core/src/file_group/reader.rs  |  56 ++++-
 crates/core/src/storage/error.rs      |   3 +
 crates/core/src/storage/mod.rs        |  34 ++-
 crates/core/src/table/mod.rs          |   5 +-
 crates/core/src/table/read_options.rs |  28 ++-
 crates/core/tests/table_read_tests.rs | 159 ++++++++++++
 crates/datafusion/Cargo.toml          |   3 +
 crates/datafusion/src/lib.rs          | 457 ++++++++++++++++++----------------
 crates/datafusion/src/util/expr.rs    | 406 ++++++++++++++++++++++++++++--
 crates/datafusion/tests/read_tests.rs | 404 ++++++++++++++++++++++++++++++
 10 files changed, 1293 insertions(+), 262 deletions(-)

diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 005b48f..8de9d86 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -373,10 +373,10 @@ impl FileGroupReader {
 
     /// Reads a base file as a stream of record batches.
     ///
-    /// # Limitations
-    ///
-    /// Currently only `batch_size` from [ReadOptions] is used. The 
`projection` and
-    /// `row_predicate` fields are not yet implemented.
+    /// Supports the following [ReadOptions]:
+    /// - `batch_size`: Controls the number of rows per batch
+    /// - `projection`: Pushes column selection to the parquet reader level
+    /// - `row_predicate`: Filters rows after reading each batch
     async fn read_base_file_stream(
         &self,
         relative_path: &str,
@@ -400,10 +400,17 @@ impl FileGroupReader {
             .get_or_default(HudiReadConfig::StreamBatchSize)
             .into();
         let batch_size = options.batch_size.unwrap_or(default_batch_size);
-        let parquet_options = 
ParquetReadOptions::new().with_batch_size(batch_size);
+        let mut parquet_options = 
ParquetReadOptions::new().with_batch_size(batch_size);
+
+        // Add projection pushdown using column names (converted to indices 
internally
+        // by get_parquet_file_stream using the same schema the projection is 
applied to)
+        if let Some(ref projection_names) = options.projection {
+            parquet_options = 
parquet_options.with_projection(projection_names.clone());
+        }
 
         let hudi_configs = self.hudi_configs.clone();
         let path = relative_path.to_string();
+        let row_predicate = options.row_predicate.clone();
 
         let parquet_stream = self
             .storage
@@ -411,19 +418,36 @@ impl FileGroupReader {
             .map_err(|e| ReadFileSliceError(format!("Failed to read path 
{path}: {e:?}")))
             .await?;
 
-        // Apply the same filtering logic as read_file_slice_by_base_file_path
+        // Apply filtering: commit time filter first, then row predicate
         let stream = parquet_stream.into_stream().filter_map(move |result| {
             let hudi_configs = hudi_configs.clone();
+            let row_predicate = row_predicate.clone();
             async move {
                 match result {
                     Err(e) => Some(Err(ReadFileSliceError(format!(
                         "Failed to read batch: {e:?}"
                     )))),
-                    Ok(batch) => match apply_commit_time_filter(&hudi_configs, 
batch) {
-                        Err(e) => Some(Err(e)),
-                        Ok(filtered) if filtered.num_rows() > 0 => 
Some(Ok(filtered)),
-                        Ok(_) => None,
-                    },
+                    Ok(batch) => {
+                        // Apply commit time filter
+                        let filtered = match 
apply_commit_time_filter(&hudi_configs, batch) {
+                            Err(e) => return Some(Err(e)),
+                            Ok(b) if b.num_rows() == 0 => return None,
+                            Ok(b) => b,
+                        };
+
+                        // Apply row predicate if present
+                        let final_batch = if let Some(ref predicate) = 
row_predicate {
+                            match apply_row_predicate(predicate.as_ref(), 
filtered) {
+                                Err(e) => return Some(Err(e)),
+                                Ok(b) if b.num_rows() == 0 => return None,
+                                Ok(b) => b,
+                            }
+                        } else {
+                            filtered
+                        };
+
+                        Some(Ok(final_batch))
+                    }
                 }
             }
         });
@@ -595,6 +619,16 @@ fn apply_commit_time_filter(hudi_configs: &HudiConfigs, 
batch: RecordBatch) -> R
     }
 }
 
+/// Apply a row predicate to filter records in a batch.
+fn apply_row_predicate(
+    predicate: &(dyn Fn(&RecordBatch) -> Result<BooleanArray> + Send + Sync),
+    batch: RecordBatch,
+) -> Result<RecordBatch> {
+    let mask = predicate(&batch)?;
+    filter_record_batch(&batch, &mask)
+        .map_err(|e| ReadFileSliceError(format!("Failed to apply row 
predicate: {e:?}")))
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/crates/core/src/storage/error.rs b/crates/core/src/storage/error.rs
index a29dd6a..f6c28f0 100644
--- a/crates/core/src/storage/error.rs
+++ b/crates/core/src/storage/error.rs
@@ -31,6 +31,9 @@ pub enum StorageError {
     #[error("Invalid path: {0}")]
     InvalidPath(String),
 
+    #[error("Invalid column: {0}")]
+    InvalidColumn(String),
+
     #[error(transparent)]
     ObjectStoreError(#[from] object_store::Error),
 
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 14af978..e9650d9 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -54,8 +54,8 @@ pub mod util;
 pub struct ParquetReadOptions {
     /// Target batch size (number of rows per batch).
     pub batch_size: Option<usize>,
-    /// Column projection (indices of columns to read).
-    pub projection: Option<Vec<usize>>,
+    /// Column projection by names.
+    pub projection: Option<Vec<String>>,
 }
 
 impl ParquetReadOptions {
@@ -68,8 +68,13 @@ impl ParquetReadOptions {
         self
     }
 
-    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
-        self.projection = Some(projection);
+    /// Sets column projection by column names.
+    pub fn with_projection<I, S>(mut self, columns: I) -> Self
+    where
+        I: IntoIterator<Item = S>,
+        S: Into<String>,
+    {
+        self.projection = Some(columns.into_iter().map(|s| 
s.into()).collect());
         self
     }
 }
@@ -297,7 +302,26 @@ impl Storage {
             builder = builder.with_batch_size(batch_size);
         }
 
-        if let Some(projection) = options.projection {
+        // Handle projection: convert column names to indices using builder's 
schema
+        if let Some(ref column_names) = options.projection {
+            let arrow_schema = builder.schema();
+            let projection: Vec<usize> = column_names
+                .iter()
+                .map(|name| {
+                    arrow_schema.index_of(name).map_err(|_| {
+                        let available = arrow_schema
+                            .fields()
+                            .iter()
+                            .map(|f| f.name().as_str())
+                            .collect::<Vec<_>>()
+                            .join(", ");
+                        StorageError::InvalidColumn(format!(
+                            "Column '{name}' not found in parquet file schema. 
Available columns: [{available}]"
+                        ))
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
             let projection_mask = parquet::arrow::ProjectionMask::roots(
                 builder.parquet_schema(),
                 projection.iter().copied(),
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index b85c2b5..f02fd3e 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -911,17 +911,18 @@ impl Table {
         )])?;
 
         // Extract options to pass to each file slice read.
-        // Note: row_predicate is not yet supported in streaming base file 
reads.
         let batch_size = options.batch_size;
         let projection = options.projection.clone();
+        let row_predicate = options.row_predicate.clone();
 
         let streams_iter = file_slices.into_iter().map(move |file_slice| {
             let fg_reader = fg_reader.clone();
             let projection = projection.clone();
+            let row_predicate = row_predicate.clone();
             let options = ReadOptions {
                 partition_filters: vec![],
                 projection,
-                row_predicate: None, // Not yet implemented in streaming reads
+                row_predicate,
                 batch_size,
                 as_of_timestamp: None,
             };
diff --git a/crates/core/src/table/read_options.rs 
b/crates/core/src/table/read_options.rs
index 499546d..f2ca99d 100644
--- a/crates/core/src/table/read_options.rs
+++ b/crates/core/src/table/read_options.rs
@@ -18,10 +18,14 @@
  */
 //! Read options for streaming reads.
 
+use std::sync::Arc;
+
 use arrow_array::{BooleanArray, RecordBatch};
 
 /// A row-level predicate function for filtering records.
-pub type RowPredicate = Box<dyn Fn(&RecordBatch) -> 
crate::Result<BooleanArray> + Send + Sync>;
+///
+/// Uses `Arc` instead of `Box` to allow cloning for async streaming contexts.
+pub type RowPredicate = Arc<dyn Fn(&RecordBatch) -> 
crate::Result<BooleanArray> + Send + Sync>;
 
 /// A partition filter tuple: (field_name, operator, value).
 /// Example: ("city", "=", "san_francisco")
@@ -36,15 +40,14 @@ pub type PartitionFilter = (String, String, String);
 /// - Batch size control (rows per batch)
 /// - Time travel (as-of timestamp)
 ///
-/// # Current Limitations
+/// # Streaming Support
 ///
-/// Not all options are fully supported in streaming APIs:
-/// - `batch_size` and `partition_filters` are fully supported.
-/// - `projection` is passed through to the streaming implementation but is 
not yet
-///   applied at the parquet read level. This is because projection requires 
mapping
-///   column names to column indices via schema lookup, which is not yet 
implemented.
-/// - `row_predicate` is not yet implemented in streaming reads. The predicate 
function
-///   is accepted but will be ignored.
+/// All options are supported in streaming APIs:
+/// - `batch_size` controls the number of rows per batch
+/// - `partition_filters` prunes partitions before reading
+/// - `projection` pushes column selection to the parquet reader level
+/// - `row_predicate` filters rows after reading each batch
+/// - `as_of_timestamp` enables time travel queries
 ///
 /// # Example
 ///
@@ -53,6 +56,7 @@ pub type PartitionFilter = (String, String, String);
 ///
 /// let options = ReadOptions::new()
 ///     .with_filters([("city", "=", "san_francisco")])
+///     .with_projection(["id", "name", "city"])
 ///     .with_batch_size(4096);
 /// ```
 #[derive(Default)]
@@ -112,8 +116,8 @@ impl ReadOptions {
 
     /// Sets the row-level predicate for filtering records.
     ///
-    /// **Note:** Row predicates are not yet implemented in streaming reads.
-    /// The predicate will be accepted but ignored during streaming operations.
+    /// The predicate function receives each `RecordBatch` and returns a 
`BooleanArray`
+    /// mask indicating which rows to keep. Rows where the mask is `true` are 
retained.
     ///
     /// # Arguments
     /// * `predicate` - A function that takes a RecordBatch and returns a 
BooleanArray mask
@@ -121,7 +125,7 @@ impl ReadOptions {
     where
         F: Fn(&RecordBatch) -> crate::Result<BooleanArray> + Send + Sync + 
'static,
     {
-        self.row_predicate = Some(Box::new(predicate));
+        self.row_predicate = Some(Arc::new(predicate));
         self
     }
 
diff --git a/crates/core/tests/table_read_tests.rs 
b/crates/core/tests/table_read_tests.rs
index 9b36a6d..fa899d7 100644
--- a/crates/core/tests/table_read_tests.rs
+++ b/crates/core/tests/table_read_tests.rs
@@ -762,6 +762,7 @@ mod v8_tables {
 /// These tests verify the streaming versions of snapshot and file slice reads.
 mod streaming_queries {
     use super::*;
+    use arrow::record_batch::RecordBatch;
     use futures::StreamExt;
     use hudi_core::table::ReadOptions;
 
@@ -979,6 +980,164 @@ mod streaming_queries {
         assert_eq!(count, 0, "Empty table should produce no batches");
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_projection() -> Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Only request id and name columns (not isActive)
+        let options = ReadOptions::new().with_projection(["id", "name"]);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(!batches.is_empty(), "Should produce at least one batch");
+
+        // Verify only projected columns are returned
+        let schema = &batches[0].schema();
+        assert_eq!(schema.fields().len(), 2, "Should only have 2 columns");
+        assert!(
+            schema.field_with_name("id").is_ok(),
+            "Should have id column"
+        );
+        assert!(
+            schema.field_with_name("name").is_ok(),
+            "Should have name column"
+        );
+        assert!(
+            schema.field_with_name("isActive").is_err(),
+            "Should NOT have isActive column"
+        );
+
+        // Verify row count is still correct
+        let records = concat_batches(schema, &batches)?;
+        assert_eq!(records.num_rows(), 4, "Should have all 4 rows");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_row_predicate() -> Result<()> {
+        use arrow::array::BooleanArray;
+
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Filter rows where isActive = true (Carol and Diana)
+        let options = ReadOptions::new().with_row_predicate(|batch: 
&RecordBatch| {
+            let col = batch
+                .column_by_name("isActive")
+                .ok_or_else(|| hudi_core::error::CoreError::Schema("isActive 
not found".into()))?;
+            let arr = col
+                .as_any()
+                .downcast_ref::<BooleanArray>()
+                .ok_or_else(|| hudi_core::error::CoreError::Schema("Not 
boolean".into()))?;
+            Ok(arr.clone())
+        });
+
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(!batches.is_empty(), "Should produce at least one batch");
+
+        let schema = &batches[0].schema();
+        let records = concat_batches(schema, &batches)?;
+
+        // Should only have Carol and Diana (isActive = true)
+        let sample_data = SampleTable::sample_data_order_by_id(&records);
+        assert_eq!(sample_data, vec![(3, "Carol", true), (4, "Diana", true)]);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_with_projection_and_row_predicate() -> 
Result<()> {
+        use arrow::array::{BooleanArray, Int32Array};
+
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Project only id and isActive, filter where isActive = true
+        let options = ReadOptions::new()
+            .with_projection(["id", "isActive"])
+            .with_row_predicate(|batch: &RecordBatch| {
+                let col = batch.column_by_name("isActive").ok_or_else(|| {
+                    hudi_core::error::CoreError::Schema("isActive not 
found".into())
+                })?;
+                let arr = col
+                    .as_any()
+                    .downcast_ref::<BooleanArray>()
+                    .ok_or_else(|| hudi_core::error::CoreError::Schema("Not 
boolean".into()))?;
+                Ok(arr.clone())
+            });
+
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        let mut batches = Vec::new();
+        while let Some(result) = stream.next().await {
+            batches.push(result?);
+        }
+
+        assert!(!batches.is_empty(), "Should produce at least one batch");
+
+        // Verify only projected columns
+        let schema = &batches[0].schema();
+        assert_eq!(schema.fields().len(), 2, "Should only have 2 columns");
+        assert!(schema.field_with_name("id").is_ok());
+        assert!(schema.field_with_name("isActive").is_ok());
+        assert!(schema.field_with_name("name").is_err());
+
+        // Verify filtered rows (only active users: Carol=3, Diana=4)
+        let records = concat_batches(schema, &batches)?;
+        assert_eq!(records.num_rows(), 2, "Should have 2 rows");
+
+        let ids = records
+            .column_by_name("id")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let id_values: Vec<i32> = ids.iter().flatten().collect();
+        assert!(id_values.contains(&3) && id_values.contains(&4));
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_snapshot_stream_projection_invalid_column() -> 
Result<()> {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await?;
+
+        // Request a non-existent column
+        let options = ReadOptions::new().with_projection(["id", 
"nonexistent_column"]);
+        let mut stream = hudi_table.read_snapshot_stream(&options).await?;
+
+        // Error occurs when polling the stream (lazy evaluation)
+        let mut found_error = false;
+        while let Some(result) = stream.next().await {
+            match result {
+                Ok(_) => {}
+                Err(err) => {
+                    assert!(
+                        err.to_string().contains("nonexistent_column"),
+                        "Error should mention the invalid column name, got: 
{err}"
+                    );
+                    found_error = true;
+                    break;
+                }
+            }
+        }
+        assert!(
+            found_error,
+            "Should have encountered an error for non-existent column"
+        );
+        Ok(())
+    }
 }
 
 /// Test module for tables with metadata table (MDT) enabled.
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index fadb942..be5b314 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -53,5 +53,8 @@ async-trait = { workspace = true }
 tokio = { workspace = true }
 url = { workspace = true }
 
+# logging
+log = { workspace = true }
+
 [dev-dependencies]
 hudi-test = { path = "../test" }
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index f502d26..aeb4e7b 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -43,6 +43,7 @@ use datafusion_common::DataFusionError::Execution;
 use datafusion_common::config::TableParquetOptions;
 use datafusion_expr::{CreateExternalTable, Expr, TableProviderFilterPushDown, 
TableType};
 use datafusion_physical_expr::create_physical_expr;
+use log::warn;
 
 use crate::util::expr::exprs_to_filters;
 use hudi_core::config::read::HudiReadConfig::InputPartitions;
@@ -79,9 +80,31 @@ use hudi_core::table::Table as HudiTable;
 ///     Ok(())
 /// }
 /// ```
-#[derive(Clone, Debug)]
+/// A DataFusion table provider for Apache Hudi tables.
+#[derive(Clone)]
 pub struct HudiDataSource {
     table: Arc<HudiTable>,
+    /// Cached partition schema for determining partition columns.
+    /// This is cached at construction since partition schema rarely changes
+    /// and is needed synchronously in `supports_filters_pushdown`.
+    partition_schema: Schema,
+}
+
+impl std::fmt::Debug for HudiDataSource {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("HudiDataSource")
+            .field("table", &self.table)
+            .field(
+                "partition_columns",
+                &self
+                    .partition_schema
+                    .fields()
+                    .iter()
+                    .map(|field| field.name())
+                    .collect::<Vec<_>>(),
+            )
+            .finish()
+    }
 }
 
 impl HudiDataSource {
@@ -95,10 +118,23 @@ impl HudiDataSource {
         K: AsRef<str>,
         V: Into<String>,
     {
-        match HudiTable::new_with_options(base_uri, options).await {
-            Ok(t) => Ok(Self { table: Arc::new(t) }),
-            Err(e) => Err(Execution(format!("Failed to create Hudi table: 
{e}"))),
-        }
+        let table = HudiTable::new_with_options(base_uri, options)
+            .await
+            .map_err(|e| Execution(format!("Failed to create Hudi table: 
{e}")))?;
+
+        // Cache partition schema at construction for use in 
supports_filters_pushdown
+        let partition_schema = match table.get_partition_schema().await {
+            Ok(s) => s,
+            Err(e) => {
+                warn!("Failed to get partition schema, using empty schema: 
{e}");
+                Schema::empty()
+            }
+        };
+
+        Ok(Self {
+            table: Arc::new(table),
+            partition_schema,
+        })
     }
 
     fn get_input_partitions(&self) -> usize {
@@ -110,21 +146,45 @@ impl HudiDataSource {
 
     /// Check if the given expression can be pushed down to the Hudi table.
     ///
-    /// The expression can be pushed down if it is a binary expression with a 
supported operator and operands.
+    /// The expression can be pushed down if it is:
+    /// - A binary expression with a supported operator and operands
+    /// - A NOT expression wrapping a pushable expression
+    /// - An AND compound expression where at least one side can be pushed down
+    /// - A BETWEEN expression with column and literals
     fn can_push_down(&self, expr: &Expr) -> bool {
         match expr {
             Expr::BinaryExpr(binary_expr) => {
                 let left = &binary_expr.left;
                 let op = &binary_expr.op;
                 let right = &binary_expr.right;
-                self.is_supported_operator(op)
-                    && self.is_supported_operand(left)
-                    && self.is_supported_operand(right)
+
+                match op {
+                    Operator::And => {
+                        // AND is pushable if at least one side is pushable
+                        self.can_push_down(left) || self.can_push_down(right)
+                    }
+                    Operator::Or => {
+                        // OR cannot be pushed down with current filter model
+                        false
+                    }
+                    _ => {
+                        self.is_supported_operator(op)
+                            && self.is_supported_operand(left)
+                            && self.is_supported_operand(right)
+                    }
+                }
             }
             Expr::Not(inner_expr) => {
                 // Recursively check if the inner expression can be pushed down
                 self.can_push_down(inner_expr)
             }
+            Expr::Between(between) => {
+                // BETWEEN can be pushed if expr is a column and bounds are 
literals
+                !between.negated
+                    && matches!(&*between.expr, Expr::Column(_))
+                    && matches!(&*between.low, Expr::Literal(..))
+                    && matches!(&*between.high, Expr::Literal(..))
+            }
             _ => false,
         }
     }
@@ -148,6 +208,55 @@ impl HudiDataSource {
             _ => false,
         }
     }
+
+    /// Returns partition column names from partition schema.
+    fn get_partition_columns(&self) -> Vec<String> {
+        self.partition_schema
+            .fields()
+            .iter()
+            .map(|f| f.name().clone())
+            .collect()
+    }
+
+    /// Checks if expression filters on a partition column.
+    ///
+    /// Partition column filters can be marked as `Exact` because they are
+    /// fully handled by partition pruning and don't need post-filtering.
+    fn is_partition_column_filter(expr: &Expr, partition_cols: &[String]) -> 
bool {
+        match expr {
+            Expr::BinaryExpr(binary_expr) => {
+                match binary_expr.op {
+                    Operator::And => {
+                        // For AND, check if both sides are partition filters
+                        Self::is_partition_column_filter(&binary_expr.left, 
partition_cols)
+                            && 
Self::is_partition_column_filter(&binary_expr.right, partition_cols)
+                    }
+                    _ => {
+                        // For partition filters, one side must be a partition 
column
+                        // and the other side must be a literal value
+                        match (&*binary_expr.left, &*binary_expr.right) {
+                            (Expr::Column(col), Expr::Literal(..))
+                                if partition_cols.contains(&col.name) =>
+                            {
+                                true
+                            }
+                            (Expr::Literal(..), Expr::Column(col))
+                                if partition_cols.contains(&col.name) =>
+                            {
+                                true
+                            }
+                            _ => false,
+                        }
+                    }
+                }
+            }
+            Expr::Not(inner) => Self::is_partition_column_filter(inner, 
partition_cols),
+            Expr::Between(between) => {
+                matches!(&*between.expr, Expr::Column(col) if 
partition_cols.contains(&col.name))
+            }
+            _ => false,
+        }
+    }
 }
 
 #[async_trait]
@@ -241,13 +350,20 @@ impl TableProvider for HudiDataSource {
         &self,
         filters: &[&Expr],
     ) -> Result<Vec<TableProviderFilterPushDown>> {
+        let partition_cols = self.get_partition_columns();
+
         filters
             .iter()
             .map(|expr| {
-                if self.can_push_down(expr) {
-                    Ok(TableProviderFilterPushDown::Inexact)
+                if !self.can_push_down(expr) {
+                    return Ok(TableProviderFilterPushDown::Unsupported);
+                }
+
+                // Partition column filters are fully handled by partition 
pruning
+                if Self::is_partition_column_filter(expr, &partition_cols) {
+                    Ok(TableProviderFilterPushDown::Exact)
                 } else {
-                    Ok(TableProviderFilterPushDown::Unsupported)
+                    Ok(TableProviderFilterPushDown::Inexact)
                 }
             })
             .collect()
@@ -337,28 +453,15 @@ impl TableProviderFactory for HudiTableFactory {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use datafusion::execution::session_state::SessionStateBuilder;
-    use datafusion::prelude::{SessionConfig, SessionContext};
-    use datafusion_common::{Column, DataFusionError, ScalarValue};
+    use datafusion_common::{Column, ScalarValue};
     use std::fs::canonicalize;
     use std::path::Path;
-    use std::sync::Arc;
     use url::Url;
 
     use datafusion::logical_expr::BinaryExpr;
-    use hudi_core::config::read::HudiReadConfig::InputPartitions;
-    use hudi_core::metadata::meta_field::MetaField;
-    use hudi_test::SampleTable::{
-        V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned, 
V6SimplekeygenHivestyleNoMetafields,
-        V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
-        V6TimebasedkeygenNonhivestyle,
-    };
-    use hudi_test::assert_arrow_field_names_eq;
-    use hudi_test::{SampleTable, util};
-    use util::{get_bool_column, get_i32_column, get_str_column};
+    use hudi_test::SampleTable::{V6Nonpartitioned, V6SimplekeygenNonhivestyle};
 
     use crate::HudiDataSource;
-    use crate::HudiTableFactory;
 
     #[tokio::test]
     async fn get_default_input_partitions() {
@@ -369,196 +472,6 @@ mod tests {
         assert_eq!(hudi.get_input_partitions(), 0)
     }
 
-    #[tokio::test]
-    async fn test_get_create_schema_from_empty_table() {
-        let table_provider =
-            HudiDataSource::new_with_options(V6Empty.path_to_cow().as_str(), 
empty_options())
-                .await
-                .unwrap();
-        let schema = table_provider.schema();
-        assert_arrow_field_names_eq!(
-            schema,
-            [MetaField::field_names(), vec!["id", "name", "isActive"]].concat()
-        );
-    }
-
-    async fn register_test_table_with_session<I, K, V>(
-        test_table: &SampleTable,
-        options: I,
-        use_sql: bool,
-    ) -> Result<SessionContext, DataFusionError>
-    where
-        I: IntoIterator<Item = (K, V)>,
-        K: AsRef<str>,
-        V: Into<String>,
-    {
-        let ctx = create_test_session().await;
-        if use_sql {
-            let create_table_sql = format!(
-                "CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}",
-                test_table.as_ref(),
-                test_table.path_to_cow(),
-                concat_as_sql_options(options)
-            );
-            ctx.sql(create_table_sql.as_str()).await?;
-        } else {
-            let base_url = test_table.url_to_cow();
-            let hudi = HudiDataSource::new_with_options(base_url.as_str(), 
options).await?;
-            ctx.register_table(test_table.as_ref(), Arc::new(hudi))?;
-        }
-        Ok(ctx)
-    }
-
-    async fn create_test_session() -> SessionContext {
-        let config = SessionConfig::new().set(
-            "datafusion.sql_parser.enable_ident_normalization",
-            &ScalarValue::from(false),
-        );
-        let table_factory: Arc<dyn TableProviderFactory> = 
Arc::new(HudiTableFactory::default());
-
-        let session_state = SessionStateBuilder::new()
-            .with_default_features()
-            .with_config(config)
-            .with_table_factories(HashMap::from([("HUDI".to_string(), 
table_factory)]))
-            .build();
-
-        SessionContext::new_with_state(session_state)
-    }
-
-    fn concat_as_sql_options<I, K, V>(options: I) -> String
-    where
-        I: IntoIterator<Item = (K, V)>,
-        K: AsRef<str>,
-        V: Into<String>,
-    {
-        let kv_pairs: Vec<String> = options
-            .into_iter()
-            .map(|(k, v)| format!("'{}' '{}'", k.as_ref(), v.into()))
-            .collect();
-
-        if kv_pairs.is_empty() {
-            String::new()
-        } else {
-            format!("OPTIONS ({})", kv_pairs.join(", "))
-        }
-    }
-
-    #[tokio::test]
-    async fn test_create_table_with_unknown_format() {
-        let test_table = V6Nonpartitioned;
-        let invalid_format = "UNKNOWN_FORMAT";
-        let create_table_sql = format!(
-            "CREATE EXTERNAL TABLE {} STORED AS {} LOCATION '{}'",
-            test_table.as_ref(),
-            invalid_format,
-            test_table.path_to_cow()
-        );
-
-        let ctx = create_test_session().await;
-        let result = ctx.sql(create_table_sql.as_str()).await;
-        assert!(result.is_err());
-    }
-
-    async fn verify_plan(
-        ctx: &SessionContext,
-        sql: &str,
-        table_name: &str,
-        planned_input_partitioned: &i32,
-    ) {
-        let explaining_df = ctx.sql(sql).await.unwrap().explain(false, 
true).unwrap();
-        let explaining_rb = explaining_df.collect().await.unwrap();
-        let explaining_rb = explaining_rb.first().unwrap();
-        let plan = get_str_column(explaining_rb, "plan").join("");
-        let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
-        assert!(plan_lines[1].starts_with("SortExec: TopK(fetch=10)"));
-        assert!(plan_lines[2].starts_with(&format!(
-            "ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as 
isActive, \
-            get_field(structField@3, field2) as 
{table_name}.structField[field2]]"
-        )));
-        assert!(plan_lines[4].starts_with(
-            "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND name@1 != Alice AND 
get_field(structField@3, field2) > 30"
-        ));
-        
assert!(plan_lines[5].contains(&format!("input_partitions={planned_input_partitioned}")));
-    }
-
-    async fn verify_data(ctx: &SessionContext, sql: &str, table_name: &str) {
-        let df = ctx.sql(sql).await.unwrap();
-        let rb = df.collect().await.unwrap();
-        let rb = rb.first().unwrap();
-        assert_eq!(get_i32_column(rb, "id"), &[2, 4]);
-        assert_eq!(get_str_column(rb, "name"), &["Bob", "Diana"]);
-        assert_eq!(get_bool_column(rb, "isActive"), &[false, true]);
-        assert_eq!(
-            get_i32_column(rb, &format!("{table_name}.structField[field2]")),
-            &[40, 50]
-        );
-    }
-
-    #[tokio::test]
-    async fn test_datafusion_read_hudi_table_with_partition_filter_pushdown() {
-        for (test_table, use_sql, planned_input_partitions) in &[
-            (V6ComplexkeygenHivestyle, true, 2),
-            (V6Nonpartitioned, true, 1),
-            (V6SimplekeygenNonhivestyle, false, 2),
-            (V6SimplekeygenHivestyleNoMetafields, true, 2),
-            (V6TimebasedkeygenNonhivestyle, false, 2),
-        ] {
-            println!(">>> testing for {}", test_table.as_ref());
-            let options = [(InputPartitions, "2")];
-            let ctx = register_test_table_with_session(test_table, options, 
*use_sql)
-                .await
-                .unwrap();
-
-            let sql = format!(
-                r#"
-            SELECT id, name, isActive, structField.field2
-            FROM {} WHERE id % 2 = 0 AND name != 'Alice'
-            AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
-                test_table.as_ref()
-            );
-
-            verify_plan(&ctx, &sql, test_table.as_ref(), 
planned_input_partitions).await;
-            verify_data(&ctx, &sql, test_table.as_ref()).await
-        }
-    }
-
-    async fn verify_data_with_replacecommits(ctx: &SessionContext, sql: &str, 
table_name: &str) {
-        let df = ctx.sql(sql).await.unwrap();
-        let rb = df.collect().await.unwrap();
-        let rb = rb.first().unwrap();
-        assert_eq!(get_i32_column(rb, "id"), &[4]);
-        assert_eq!(get_str_column(rb, "name"), &["Diana"]);
-        assert_eq!(get_bool_column(rb, "isActive"), &[false]);
-        assert_eq!(
-            get_i32_column(rb, &format!("{table_name}.structField[field2]")),
-            &[50]
-        );
-    }
-
-    #[tokio::test]
-    async fn 
test_datafusion_read_hudi_table_with_replacecommits_with_partition_filter_pushdown()
 {
-        for (test_table, use_sql, planned_input_partitions) in
-            &[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)]
-        {
-            println!(">>> testing for {}", test_table.as_ref());
-            let ctx =
-                register_test_table_with_session(test_table, 
[(InputPartitions, "2")], *use_sql)
-                    .await
-                    .unwrap();
-
-            let sql = format!(
-                r#"
-            SELECT id, name, isActive, structField.field2
-            FROM {} WHERE id % 2 = 0 AND name != 'Alice'
-            AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
-                test_table.as_ref()
-            );
-
-            verify_plan(&ctx, &sql, test_table.as_ref(), 
planned_input_partitions).await;
-            verify_data_with_replacecommits(&ctx, &sql, 
test_table.as_ref()).await
-        }
-    }
-
     #[tokio::test]
     async fn test_supports_filters_pushdown() {
         let table_provider = HudiDataSource::new_with_options(
@@ -612,6 +525,7 @@ mod tests {
         let result = 
table_provider.supports_filters_pushdown(&filters).unwrap();
 
         assert_eq!(result.len(), 6);
+        // Non-partitioned table - all filters are Inexact (no partition 
columns)
         assert_eq!(result[0], TableProviderFilterPushDown::Inexact);
         assert_eq!(result[1], TableProviderFilterPushDown::Inexact);
         assert_eq!(result[2], TableProviderFilterPushDown::Unsupported);
@@ -619,4 +533,113 @@ mod tests {
         assert_eq!(result[4], TableProviderFilterPushDown::Unsupported);
         assert_eq!(result[5], TableProviderFilterPushDown::Inexact);
     }
+
+    #[tokio::test]
+    async fn test_supports_filters_pushdown_exact_for_partition_columns() {
+        // Use a partitioned table - byteField is the partition column
+        let table_provider = HudiDataSource::new_with_options(
+            V6SimplekeygenNonhivestyle.path_to_cow().as_str(),
+            empty_options(),
+        )
+        .await
+        .unwrap();
+
+        // Filter on partition column (byteField) - should be Exact
+        let partition_filter = Expr::BinaryExpr(BinaryExpr {
+            left: 
Box::new(Expr::Column(Column::from_name("byteField".to_string()))),
+            op: Operator::Eq,
+            right: Box::new(Expr::Literal(ScalarValue::Int8(Some(1)), None)),
+        });
+
+        // Filter on non-partition column (name) - should be Inexact
+        let non_partition_filter = Expr::BinaryExpr(BinaryExpr {
+            left: 
Box::new(Expr::Column(Column::from_name("name".to_string()))),
+            op: Operator::Eq,
+            right: Box::new(Expr::Literal(
+                ScalarValue::Utf8(Some("Alice".to_string())),
+                None,
+            )),
+        });
+
+        let filters = vec![&partition_filter, &non_partition_filter];
+        let result = 
table_provider.supports_filters_pushdown(&filters).unwrap();
+
+        assert_eq!(result.len(), 2);
+        // Partition column filter is Exact
+        assert_eq!(result[0], TableProviderFilterPushDown::Exact);
+        // Non-partition column filter is Inexact
+        assert_eq!(result[1], TableProviderFilterPushDown::Inexact);
+    }
+
+    #[tokio::test]
+    async fn test_supports_filters_pushdown_and_between() {
+        let table_provider = HudiDataSource::new_with_options(
+            V6Nonpartitioned.path_to_cow().as_str(),
+            empty_options(),
+        )
+        .await
+        .unwrap();
+
+        // AND expression: name = 'Alice' AND intField > 100
+        let left = Expr::BinaryExpr(BinaryExpr {
+            left: 
Box::new(Expr::Column(Column::from_name("name".to_string()))),
+            op: Operator::Eq,
+            right: Box::new(Expr::Literal(
+                ScalarValue::Utf8(Some("Alice".to_string())),
+                None,
+            )),
+        });
+        let right = Expr::BinaryExpr(BinaryExpr {
+            left: 
Box::new(Expr::Column(Column::from_name("intField".to_string()))),
+            op: Operator::Gt,
+            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(100)), 
None)),
+        });
+        let and_expr = Expr::BinaryExpr(BinaryExpr {
+            left: Box::new(left),
+            op: Operator::And,
+            right: Box::new(right),
+        });
+
+        // BETWEEN expression: intField BETWEEN 10 AND 100
+        let between_expr = Expr::Between(datafusion_expr::Between::new(
+            Box::new(Expr::Column(Column::from_name("intField".to_string()))),
+            false,
+            Box::new(Expr::Literal(ScalarValue::Int32(Some(10)), None)),
+            Box::new(Expr::Literal(ScalarValue::Int32(Some(100)), None)),
+        ));
+
+        // OR expression - should be unsupported
+        let or_left = Expr::BinaryExpr(BinaryExpr {
+            left: 
Box::new(Expr::Column(Column::from_name("name".to_string()))),
+            op: Operator::Eq,
+            right: Box::new(Expr::Literal(
+                ScalarValue::Utf8(Some("Alice".to_string())),
+                None,
+            )),
+        });
+        let or_right = Expr::BinaryExpr(BinaryExpr {
+            left: 
Box::new(Expr::Column(Column::from_name("name".to_string()))),
+            op: Operator::Eq,
+            right: Box::new(Expr::Literal(
+                ScalarValue::Utf8(Some("Bob".to_string())),
+                None,
+            )),
+        });
+        let or_expr = Expr::BinaryExpr(BinaryExpr {
+            left: Box::new(or_left),
+            op: Operator::Or,
+            right: Box::new(or_right),
+        });
+
+        let filters = vec![&and_expr, &between_expr, &or_expr];
+        let result = 
table_provider.supports_filters_pushdown(&filters).unwrap();
+
+        assert_eq!(result.len(), 3);
+        // AND expression is supported
+        assert_eq!(result[0], TableProviderFilterPushDown::Inexact);
+        // BETWEEN expression is supported
+        assert_eq!(result[1], TableProviderFilterPushDown::Inexact);
+        // OR expression is not supported
+        assert_eq!(result[2], TableProviderFilterPushDown::Unsupported);
+    }
 }
diff --git a/crates/datafusion/src/util/expr.rs 
b/crates/datafusion/src/util/expr.rs
index 5f20216..0b68649 100644
--- a/crates/datafusion/src/util/expr.rs
+++ b/crates/datafusion/src/util/expr.rs
@@ -18,34 +18,83 @@
  */
 
 use datafusion::logical_expr::Operator;
-use datafusion_expr::{BinaryExpr, Expr};
+use datafusion_expr::{Between, BinaryExpr, Expr};
 use hudi_core::expr::filter::{Filter as HudiFilter, col};
+use log::{debug, warn};
 
-/// Converts DataFusion expressions into Hudi filters.
+/// Extracts pushdown-safe filters from DataFusion expressions for partition 
pruning.
 ///
-/// Takes a slice of DataFusion [`Expr`] and attempts to convert each 
expression
-/// into a [`HudiFilter`]. Only binary expressions and NOT expressions are 
currently supported.
+/// Takes a slice of DataFusion [`Expr`] and extracts filters that can be 
safely
+/// pushed down for partition pruning. The returned filters represent a 
**subset**
+/// of the original expression's constraints.
+///
+/// # Supported Expressions
+/// - Binary comparisons: `=`, `!=`, `<`, `>`, `<=`, `>=`
+/// - `NOT` expressions: negates inner binary expression
+/// - `AND` compound expressions: recursively flattens both sides
+/// - `BETWEEN` expressions: converts to `>= low AND <= high`
+///
+/// # OR Expression Handling
+///
+/// `OR` expressions cannot be represented in the current filter model and are
+/// **skipped**. This means expressions containing `OR` will be **partially 
extracted**:
+///
+/// | Input Expression      | Extracted Filters | Notes                        
  |
+/// 
|-----------------------|-------------------|--------------------------------|
+/// | `A AND B`             | `[A, B]`          | Full extraction              
  |
+/// | `A OR B`              | `[]`              | OR skipped entirely          
  |
+/// | `A AND (B OR C)`      | `[A]`             | Only A extracted, OR skipped 
  |
+/// | `(A OR B) AND C`      | `[C]`             | Only C extracted, OR skipped 
  |
+///
+/// # Safety
+///
+/// This function is **safe for partition pruning** because:
+/// - Extracted filters are a weaker constraint (may match more rows than 
original)
+/// - Partitions that don't match extracted filters definitely don't match 
original
+/// - The original expression must still be applied to filter actual row data
+///
+/// **Callers must still apply the original expression for correctness.**
+/// The extracted filters are for optimization (pruning), not semantic 
equivalence.
 ///
 /// # Arguments
 /// * `exprs` - A slice of DataFusion expressions to convert
 ///
 /// # Returns
-/// Returns `Some(Vec<HudiFilter>)` if at least one filter is successfully 
converted,
-/// otherwise returns `None`.
-///
-/// TODO: Handle other DataFusion [`Expr`]
+/// A vector of filter tuples `(field_name, operator, value)`. All returned 
filters
+/// are implicitly AND-ed together.
 pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<(String, String, String)> {
     exprs
         .iter()
-        .filter_map(|expr| match expr {
-            Expr::BinaryExpr(binary_expr) => 
binary_expr_to_filter(binary_expr),
-            Expr::Not(not_expr) => not_expr_to_filter(not_expr),
-            _ => None,
-        })
+        .flat_map(expr_to_filters)
         .map(|filter| filter.into())
         .collect()
 }
 
+/// Recursively extracts pushdown-safe filters from a single expression.
+///
+/// OR expressions return empty (cannot be pushed down), which may result in
+/// partial extraction when OR is nested within AND expressions.
+fn expr_to_filters(expr: &Expr) -> Vec<HudiFilter> {
+    match expr {
+        Expr::BinaryExpr(binary_expr) => match binary_expr.op {
+            Operator::And => {
+                // Recursively flatten AND expressions
+                let mut filters = expr_to_filters(&binary_expr.left);
+                filters.extend(expr_to_filters(&binary_expr.right));
+                filters
+            }
+            Operator::Or => {
+                // Cannot represent OR in current filter model - skip
+                vec![]
+            }
+            _ => binary_expr_to_filter(binary_expr).into_iter().collect(),
+        },
+        Expr::Not(not_expr) => 
not_expr_to_filter(not_expr).into_iter().collect(),
+        Expr::Between(between) => between_to_filters(between),
+        _ => vec![],
+    }
+}
+
 /// Converts a binary expression [`Expr::BinaryExpr`] into a [`HudiFilter`].
 fn binary_expr_to_filter(binary_expr: &BinaryExpr) -> Option<HudiFilter> {
     // extract the column and literal from the binary expression
@@ -81,6 +130,52 @@ fn not_expr_to_filter(not_expr: &Expr) -> 
Option<HudiFilter> {
     }
 }
 
+/// Converts a BETWEEN expression into two filters: >= low AND <= high.
+///
+/// If `negated` is true, returns empty (NOT BETWEEN is complex to represent).
+fn between_to_filters(between: &Between) -> Vec<HudiFilter> {
+    if between.negated {
+        debug!("NOT BETWEEN expressions cannot be pushed down");
+        return vec![];
+    }
+
+    // Extract column name from the expression
+    let column_name = match &*between.expr {
+        Expr::Column(col) => col.name.clone(),
+        _ => {
+            debug!("BETWEEN with non-column expression cannot be pushed down");
+            return vec![];
+        }
+    };
+
+    // Extract literal values from low and high bounds
+    let low_str = match &*between.low {
+        Expr::Literal(lit, _) => lit.to_string(),
+        _ => {
+            warn!(
+                "BETWEEN low bound is not a literal for column 
'{column_name}', skipping pushdown"
+            );
+            return vec![];
+        }
+    };
+
+    let high_str = match &*between.high {
+        Expr::Literal(lit, _) => lit.to_string(),
+        _ => {
+            warn!(
+                "BETWEEN high bound is not a literal for column 
'{column_name}', skipping pushdown"
+            );
+            return vec![];
+        }
+    };
+
+    // Create two filters: >= low AND <= high
+    vec![
+        col(&column_name).gte(low_str),
+        col(&column_name).lte(high_str),
+    ]
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -256,10 +351,11 @@ mod tests {
 
     #[test]
     fn test_convert_expr_with_unsupported_operator() {
+        // Modulo operator is not supported
         let expr = Expr::BinaryExpr(BinaryExpr::new(
             Box::new(col("col")),
-            Operator::And,
-            Box::new(lit("value")),
+            Operator::Modulo,
+            Box::new(lit(2i32)),
         ));
 
         let filters = vec![expr];
@@ -267,6 +363,94 @@ mod tests {
         assert!(result.is_empty());
     }
 
+    #[test]
+    fn test_convert_and_compound_expr() {
+        // Test: col1 = 'a' AND col2 = 'b' should produce two filters
+        let left = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(col("col1")),
+            Operator::Eq,
+            Box::new(lit("a")),
+        ));
+        let right = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(col("col2")),
+            Operator::Eq,
+            Box::new(lit("b")),
+        ));
+        let and_expr = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(left),
+            Operator::And,
+            Box::new(right),
+        ));
+
+        let result = exprs_to_filters(&[and_expr]);
+
+        assert_eq!(result.len(), 2);
+        assert_eq!(result[0].0, "col1");
+        assert_eq!(result[0].1, "=");
+        assert_eq!(result[0].2, "a");
+        assert_eq!(result[1].0, "col2");
+        assert_eq!(result[1].1, "=");
+        assert_eq!(result[1].2, "b");
+    }
+
+    #[test]
+    fn test_convert_or_expr_returns_empty() {
+        // OR expressions cannot be pushed down
+        let left = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(col("col1")),
+            Operator::Eq,
+            Box::new(lit("a")),
+        ));
+        let right = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(col("col2")),
+            Operator::Eq,
+            Box::new(lit("b")),
+        ));
+        let or_expr = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(left),
+            Operator::Or,
+            Box::new(right),
+        ));
+
+        let result = exprs_to_filters(&[or_expr]);
+        assert!(result.is_empty());
+    }
+
+    #[test]
+    fn test_convert_between_expr() {
+        // Test: col BETWEEN 10 AND 20 should produce >= 10 AND <= 20
+        let between = Expr::Between(Between::new(
+            Box::new(col("count")),
+            false,
+            Box::new(lit(10i32)),
+            Box::new(lit(20i32)),
+        ));
+
+        let result = exprs_to_filters(&[between]);
+
+        assert_eq!(result.len(), 2);
+        assert_eq!(result[0].0, "count");
+        assert_eq!(result[0].1, ">=");
+        assert_eq!(result[0].2, "10");
+        assert_eq!(result[1].0, "count");
+        assert_eq!(result[1].1, "<=");
+        assert_eq!(result[1].2, "20");
+    }
+
+    #[test]
+    fn test_convert_not_between_returns_empty() {
+        // NOT BETWEEN cannot be represented in current filter model
+        let not_between = Expr::Between(Between::new(
+            Box::new(col("count")),
+            true, // negated
+            Box::new(lit(10i32)),
+            Box::new(lit(20i32)),
+        ));
+
+        let result = exprs_to_filters(&[not_between]);
+        assert!(result.is_empty());
+    }
+
     #[test]
     fn test_negate_operator_for_all_ops() {
         for (op, _) in ExprOperator::TOKEN_OP_PAIRS {
@@ -279,4 +463,196 @@ mod tests {
             }
         }
     }
+
+    // 
=========================================================================
+    // Partial extraction tests for OR expressions
+    // 
=========================================================================
+    //
+    // These tests verify the documented behavior: OR expressions cannot be
+    // pushed down, so expressions containing OR are partially extracted.
+    // This is safe for partition pruning (extracted filters are weaker
+    // constraints) but callers must apply original expressions for 
correctness.
+
+    #[test]
+    fn test_partial_extraction_and_with_or_on_right() {
+        // Test: A AND (B OR C) should extract only [A]
+        // The OR subtree is skipped, leaving only the left AND operand
+        let a = col("col_a").eq(lit("a"));
+        let b = col("col_b").eq(lit("b"));
+        let c = col("col_c").eq(lit("c"));
+
+        // Build: (B OR C)
+        let b_or_c = Expr::BinaryExpr(BinaryExpr::new(Box::new(b), 
Operator::Or, Box::new(c)));
+
+        // Build: A AND (B OR C)
+        let expr = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(a),
+            Operator::And,
+            Box::new(b_or_c),
+        ));
+
+        let result = exprs_to_filters(&[expr]);
+
+        // Only A should be extracted; (B OR C) is skipped
+        assert_eq!(
+            result.len(),
+            1,
+            "Expected only 1 filter (A), OR subtree skipped"
+        );
+        assert_eq!(result[0].0, "col_a");
+        assert_eq!(result[0].1, "=");
+        assert_eq!(result[0].2, "a");
+    }
+
+    #[test]
+    fn test_partial_extraction_and_with_or_on_left() {
+        // Test: (A OR B) AND C should extract only [C]
+        // The OR subtree is skipped, leaving only the right AND operand
+        let a = col("col_a").eq(lit("a"));
+        let b = col("col_b").eq(lit("b"));
+        let c = col("col_c").eq(lit("c"));
+
+        // Build: (A OR B)
+        let a_or_b = Expr::BinaryExpr(BinaryExpr::new(Box::new(a), 
Operator::Or, Box::new(b)));
+
+        // Build: (A OR B) AND C
+        let expr = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(a_or_b),
+            Operator::And,
+            Box::new(c),
+        ));
+
+        let result = exprs_to_filters(&[expr]);
+
+        // Only C should be extracted; (A OR B) is skipped
+        assert_eq!(
+            result.len(),
+            1,
+            "Expected only 1 filter (C), OR subtree skipped"
+        );
+        assert_eq!(result[0].0, "col_c");
+        assert_eq!(result[0].1, "=");
+        assert_eq!(result[0].2, "c");
+    }
+
+    #[test]
+    fn test_partial_extraction_complex_and_or_mix() {
+        // Test: (A AND B) AND (C OR D) should extract [A, B]
+        // The left AND subtree is fully extracted, right OR subtree is skipped
+        let a = col("col_a").eq(lit("a"));
+        let b = col("col_b").eq(lit("b"));
+        let c = col("col_c").eq(lit("c"));
+        let d = col("col_d").eq(lit("d"));
+
+        // Build: (A AND B)
+        let a_and_b = Expr::BinaryExpr(BinaryExpr::new(Box::new(a), 
Operator::And, Box::new(b)));
+
+        // Build: (C OR D)
+        let c_or_d = Expr::BinaryExpr(BinaryExpr::new(Box::new(c), 
Operator::Or, Box::new(d)));
+
+        // Build: (A AND B) AND (C OR D)
+        let expr = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(a_and_b),
+            Operator::And,
+            Box::new(c_or_d),
+        ));
+
+        let result = exprs_to_filters(&[expr]);
+
+        // A and B should be extracted; (C OR D) is skipped
+        assert_eq!(
+            result.len(),
+            2,
+            "Expected 2 filters (A, B), OR subtree skipped"
+        );
+        assert_eq!(result[0].0, "col_a");
+        assert_eq!(result[1].0, "col_b");
+    }
+
+    #[test]
+    fn test_partial_extraction_or_both_sides_skipped() {
+        // Test: (A OR B) AND (C OR D) should extract []
+        // Both sides are OR, so nothing can be extracted
+        let a = col("col_a").eq(lit("a"));
+        let b = col("col_b").eq(lit("b"));
+        let c = col("col_c").eq(lit("c"));
+        let d = col("col_d").eq(lit("d"));
+
+        let a_or_b = Expr::BinaryExpr(BinaryExpr::new(Box::new(a), 
Operator::Or, Box::new(b)));
+
+        let c_or_d = Expr::BinaryExpr(BinaryExpr::new(Box::new(c), 
Operator::Or, Box::new(d)));
+
+        let expr = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(a_or_b),
+            Operator::And,
+            Box::new(c_or_d),
+        ));
+
+        let result = exprs_to_filters(&[expr]);
+
+        // Both sides are OR, nothing extracted
+        assert!(
+            result.is_empty(),
+            "Expected empty result when both AND operands are OR"
+        );
+    }
+
+    #[test]
+    fn test_partial_extraction_deeply_nested() {
+        // Test: A AND (B AND (C OR D)) should extract [A, B]
+        // Nested AND is flattened, nested OR is skipped
+        let a = col("col_a").eq(lit("a"));
+        let b = col("col_b").eq(lit("b"));
+        let c = col("col_c").eq(lit("c"));
+        let d = col("col_d").eq(lit("d"));
+
+        // Build: (C OR D)
+        let c_or_d = Expr::BinaryExpr(BinaryExpr::new(Box::new(c), 
Operator::Or, Box::new(d)));
+
+        // Build: B AND (C OR D)
+        let b_and_c_or_d = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(b),
+            Operator::And,
+            Box::new(c_or_d),
+        ));
+
+        // Build: A AND (B AND (C OR D))
+        let expr = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(a),
+            Operator::And,
+            Box::new(b_and_c_or_d),
+        ));
+
+        let result = exprs_to_filters(&[expr]);
+
+        // A and B should be extracted from nested ANDs; (C OR D) is skipped
+        assert_eq!(
+            result.len(),
+            2,
+            "Expected 2 filters (A, B) from nested ANDs"
+        );
+        assert_eq!(result[0].0, "col_a");
+        assert_eq!(result[1].0, "col_b");
+    }
+
+    #[test]
+    fn test_partial_extraction_multiple_input_exprs() {
+        // Test: Multiple expressions in input slice
+        // Input: [A, (B OR C)] should extract [A] (B OR C skipped)
+        let a = col("col_a").eq(lit("a"));
+        let b = col("col_b").eq(lit("b"));
+        let c = col("col_c").eq(lit("c"));
+
+        let b_or_c = Expr::BinaryExpr(BinaryExpr::new(Box::new(b), 
Operator::Or, Box::new(c)));
+
+        let result = exprs_to_filters(&[a, b_or_c]);
+
+        // Only A from first expr; second expr (B OR C) is skipped entirely
+        assert_eq!(
+            result.len(),
+            1,
+            "Expected 1 filter from first expr, OR expr skipped"
+        );
+        assert_eq!(result[0].0, "col_a");
+    }
 }
diff --git a/crates/datafusion/tests/read_tests.rs 
b/crates/datafusion/tests/read_tests.rs
new file mode 100644
index 0000000..5886c4f
--- /dev/null
+++ b/crates/datafusion/tests/read_tests.rs
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! E2E tests for DataFusion integration with Hudi tables.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion::catalog::TableProviderFactory;
+use datafusion::datasource::TableProvider;
+use datafusion::error::Result;
+use datafusion::execution::session_state::SessionStateBuilder;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_common::{DataFusionError, ScalarValue};
+
+use hudi_core::config::read::HudiReadConfig::InputPartitions;
+use hudi_core::config::util::empty_options;
+use hudi_core::metadata::meta_field::MetaField;
+use hudi_datafusion::{HudiDataSource, HudiTableFactory};
+use hudi_test::util::{get_bool_column, get_i32_column, get_str_column};
+use hudi_test::{SampleTable, assert_arrow_field_names_eq};
+
+// ============================================================================
+// Helper Functions
+// ============================================================================
+
+async fn create_test_session() -> SessionContext {
+    let config = SessionConfig::new().set(
+        "datafusion.sql_parser.enable_ident_normalization",
+        &ScalarValue::from(false),
+    );
+    let table_factory: Arc<dyn TableProviderFactory> = 
Arc::new(HudiTableFactory::default());
+
+    let session_state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_config(config)
+        .with_table_factories(HashMap::from([("HUDI".to_string(), 
table_factory)]))
+        .build();
+
+    SessionContext::new_with_state(session_state)
+}
+
+async fn register_test_table_with_session<I, K, V>(
+    test_table: &SampleTable,
+    options: I,
+    use_sql: bool,
+) -> Result<SessionContext, DataFusionError>
+where
+    I: IntoIterator<Item = (K, V)>,
+    K: AsRef<str>,
+    V: Into<String>,
+{
+    let ctx = create_test_session().await;
+    if use_sql {
+        let create_table_sql = format!(
+            "CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}",
+            test_table.as_ref(),
+            test_table.path_to_cow(),
+            concat_as_sql_options(options)
+        );
+        ctx.sql(create_table_sql.as_str()).await?;
+    } else {
+        let base_url = test_table.url_to_cow();
+        let hudi = HudiDataSource::new_with_options(base_url.as_str(), 
options).await?;
+        ctx.register_table(test_table.as_ref(), Arc::new(hudi))?;
+    }
+    Ok(ctx)
+}
+
+/// Register a table with the given session using direct registration (not 
SQL).
+async fn register_table_direct<I, K, V>(
+    test_table: &SampleTable,
+    options: I,
+) -> Result<SessionContext, DataFusionError>
+where
+    I: IntoIterator<Item = (K, V)>,
+    K: AsRef<str>,
+    V: Into<String>,
+{
+    let ctx = create_test_session().await;
+    let base_url = test_table.url_to_cow();
+    let hudi = HudiDataSource::new_with_options(base_url.as_str(), 
options).await?;
+    ctx.register_table(test_table.as_ref(), Arc::new(hudi))?;
+    Ok(ctx)
+}
+
+fn concat_as_sql_options<I, K, V>(options: I) -> String
+where
+    I: IntoIterator<Item = (K, V)>,
+    K: AsRef<str>,
+    V: Into<String>,
+{
+    let kv_pairs: Vec<String> = options
+        .into_iter()
+        .map(|(k, v)| format!("'{}' '{}'", k.as_ref(), v.into()))
+        .collect();
+
+    if kv_pairs.is_empty() {
+        String::new()
+    } else {
+        format!("OPTIONS ({})", kv_pairs.join(", "))
+    }
+}
+
+async fn verify_plan(
+    ctx: &SessionContext,
+    sql: &str,
+    table_name: &str,
+    planned_input_partitioned: &i32,
+) {
+    let explaining_df = ctx.sql(sql).await.unwrap().explain(false, 
true).unwrap();
+    let explaining_rb = explaining_df.collect().await.unwrap();
+    let explaining_rb = explaining_rb.first().unwrap();
+    let plan = get_str_column(explaining_rb, "plan").join("");
+    let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
+    assert!(plan_lines[1].starts_with("SortExec: TopK(fetch=10)"));
+    assert!(plan_lines[2].starts_with(&format!(
+        "ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as 
isActive, \
+        get_field(structField@3, field2) as {table_name}.structField[field2]]"
+    )));
+    assert!(plan_lines[4].starts_with(
+        "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND name@1 != Alice AND 
get_field(structField@3, field2) > 30"
+    ));
+    
assert!(plan_lines[5].contains(&format!("input_partitions={planned_input_partitioned}")));
+}
+
+async fn verify_data(ctx: &SessionContext, sql: &str, table_name: &str) {
+    let df = ctx.sql(sql).await.unwrap();
+    let rb = df.collect().await.unwrap();
+    let rb = rb.first().unwrap();
+    assert_eq!(get_i32_column(rb, "id"), &[2, 4]);
+    assert_eq!(get_str_column(rb, "name"), &["Bob", "Diana"]);
+    assert_eq!(get_bool_column(rb, "isActive"), &[false, true]);
+    assert_eq!(
+        get_i32_column(rb, &format!("{table_name}.structField[field2]")),
+        &[40, 50]
+    );
+}
+
+async fn verify_data_with_replacecommits(ctx: &SessionContext, sql: &str, 
table_name: &str) {
+    let df = ctx.sql(sql).await.unwrap();
+    let rb = df.collect().await.unwrap();
+    let rb = rb.first().unwrap();
+    assert_eq!(get_i32_column(rb, "id"), &[4]);
+    assert_eq!(get_str_column(rb, "name"), &["Diana"]);
+    assert_eq!(get_bool_column(rb, "isActive"), &[false]);
+    assert_eq!(
+        get_i32_column(rb, &format!("{table_name}.structField[field2]")),
+        &[50]
+    );
+}
+
+// ============================================================================
+// V6 Table Tests (moved from lib.rs)
+// ============================================================================
+
+mod v6_tests {
+    use super::*;
+    use hudi_test::SampleTable::{
+        V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned, 
V6SimplekeygenHivestyleNoMetafields,
+        V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
+        V6TimebasedkeygenNonhivestyle,
+    };
+
+    #[tokio::test]
+    async fn test_get_create_schema_from_empty_table() {
+        let table_provider =
+            HudiDataSource::new_with_options(V6Empty.path_to_cow().as_str(), 
empty_options())
+                .await
+                .unwrap();
+        let schema = table_provider.schema();
+        assert_arrow_field_names_eq!(
+            schema,
+            [MetaField::field_names(), vec!["id", "name", "isActive"]].concat()
+        );
+    }
+
+    #[tokio::test]
+    async fn test_create_table_with_unknown_format() {
+        let test_table = V6Nonpartitioned;
+        let invalid_format = "UNKNOWN_FORMAT";
+        let create_table_sql = format!(
+            "CREATE EXTERNAL TABLE {} STORED AS {} LOCATION '{}'",
+            test_table.as_ref(),
+            invalid_format,
+            test_table.path_to_cow()
+        );
+
+        let ctx = create_test_session().await;
+        let result = ctx.sql(create_table_sql.as_str()).await;
+        assert!(result.is_err());
+    }
+
+    #[tokio::test]
+    async fn test_datafusion_read_hudi_table_with_partition_filter_pushdown() {
+        for (test_table, use_sql, planned_input_partitions) in &[
+            (V6ComplexkeygenHivestyle, true, 2),
+            (V6Nonpartitioned, true, 1),
+            (V6SimplekeygenNonhivestyle, false, 2),
+            (V6SimplekeygenHivestyleNoMetafields, true, 2),
+            (V6TimebasedkeygenNonhivestyle, false, 2),
+        ] {
+            println!(">>> testing for {}", test_table.as_ref());
+            let options = [(InputPartitions, "2")];
+            let ctx = register_test_table_with_session(test_table, options, 
*use_sql)
+                .await
+                .unwrap();
+
+            let sql = format!(
+                r#"
+            SELECT id, name, isActive, structField.field2
+            FROM {} WHERE id % 2 = 0 AND name != 'Alice'
+            AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
+                test_table.as_ref()
+            );
+
+            verify_plan(&ctx, &sql, test_table.as_ref(), 
planned_input_partitions).await;
+            verify_data(&ctx, &sql, test_table.as_ref()).await
+        }
+    }
+
+    #[tokio::test]
+    async fn 
test_datafusion_read_hudi_table_with_replacecommits_with_partition_filter_pushdown()
 {
+        for (test_table, use_sql, planned_input_partitions) in
+            &[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)]
+        {
+            println!(">>> testing for {}", test_table.as_ref());
+            let ctx =
+                register_test_table_with_session(test_table, 
[(InputPartitions, "2")], *use_sql)
+                    .await
+                    .unwrap();
+
+            let sql = format!(
+                r#"
+            SELECT id, name, isActive, structField.field2
+            FROM {} WHERE id % 2 = 0 AND name != 'Alice'
+            AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
+                test_table.as_ref()
+            );
+
+            verify_plan(&ctx, &sql, test_table.as_ref(), 
planned_input_partitions).await;
+            verify_data_with_replacecommits(&ctx, &sql, 
test_table.as_ref()).await
+        }
+    }
+}
+
+// ============================================================================
+// V8 Table Tests (new)
+// ============================================================================
+
+mod v8_tests {
+    use super::*;
+    use hudi_test::SampleTable::{
+        V8ComplexkeygenHivestyle, V8Nonpartitioned, V8SimplekeygenNonhivestyle,
+    };
+
+    #[tokio::test]
+    async fn test_v8_nonpartitioned_read() {
+        let test_table = V8Nonpartitioned;
+        println!(">>> testing V8 for {}", test_table.as_ref());
+
+        let ctx = register_table_direct(&test_table, [(InputPartitions, "2")])
+            .await
+            .unwrap();
+
+        // Verify schema
+        let df = ctx
+            .sql(&format!("SELECT * FROM {} LIMIT 1", test_table.as_ref()))
+            .await
+            .unwrap();
+        let schema = df.schema();
+        // V8 tables should have the expected columns
+        assert!(schema.field_with_name(None, "id").is_ok());
+        assert!(schema.field_with_name(None, "name").is_ok());
+        assert!(schema.field_with_name(None, "isActive").is_ok());
+
+        // Verify data read with filters
+        let sql = format!(
+            r#"SELECT id, name, isActive FROM {} WHERE id > 0 ORDER BY id"#,
+            test_table.as_ref()
+        );
+        let df = ctx.sql(&sql).await.unwrap();
+        let rb = df.collect().await.unwrap();
+        assert!(!rb.is_empty(), "Should return data from V8 table");
+
+        // Verify plan includes DataSourceExec
+        let explaining_df = ctx.sql(&sql).await.unwrap().explain(false, 
true).unwrap();
+        let explaining_rb = explaining_df.collect().await.unwrap();
+        let explaining_rb = explaining_rb.first().unwrap();
+        let plan = get_str_column(explaining_rb, "plan").join("");
+        assert!(
+            plan.contains("DataSourceExec"),
+            "Plan should contain DataSourceExec"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_v8_partitioned_filter_pushdown() {
+        let test_table = V8SimplekeygenNonhivestyle;
+        println!(">>> testing V8 for {}", test_table.as_ref());
+
+        let ctx = register_table_direct(&test_table, [(InputPartitions, "2")])
+            .await
+            .unwrap();
+
+        let sql = format!(
+            r#"
+            SELECT id, name, isActive, structField.field2
+            FROM {} WHERE id % 2 = 0 AND name != 'Alice'
+            AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
+            test_table.as_ref()
+        );
+
+        // Verify plan
+        let explaining_df = ctx.sql(&sql).await.unwrap().explain(false, 
true).unwrap();
+        let explaining_rb = explaining_df.collect().await.unwrap();
+        let explaining_rb = explaining_rb.first().unwrap();
+        let plan = get_str_column(explaining_rb, "plan").join("");
+        let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
+
+        // Verify execution plan structure
+        assert!(
+            plan_lines[1].starts_with("SortExec: TopK(fetch=10)"),
+            "Should have TopK sort"
+        );
+        assert!(
+            plan_lines[2].contains("ProjectionExec"),
+            "Should have ProjectionExec"
+        );
+        assert!(
+            plan.contains("FilterExec"),
+            "Should have FilterExec for non-partition filters"
+        );
+        assert!(
+            plan.contains("input_partitions=2"),
+            "Should have input_partitions=2"
+        );
+
+        // Verify data
+        let df = ctx.sql(&sql).await.unwrap();
+        let rb = df.collect().await.unwrap();
+        let rb = rb.first().unwrap();
+        assert_eq!(get_i32_column(rb, "id"), &[2, 4]);
+        assert_eq!(get_str_column(rb, "name"), &["Bob", "Diana"]);
+        assert_eq!(get_bool_column(rb, "isActive"), &[false, true]);
+    }
+
+    #[tokio::test]
+    async fn test_v8_complex_keygen() {
+        let test_table = V8ComplexkeygenHivestyle;
+        println!(">>> testing V8 for {}", test_table.as_ref());
+
+        let ctx = register_table_direct(&test_table, [(InputPartitions, "2")])
+            .await
+            .unwrap();
+
+        let sql = format!(
+            r#"
+            SELECT id, name, isActive, structField.field2
+            FROM {} WHERE id % 2 = 0 AND name != 'Alice'
+            AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
+            test_table.as_ref()
+        );
+
+        // Verify plan has correct input partitions for complex keygen
+        let explaining_df = ctx.sql(&sql).await.unwrap().explain(false, 
true).unwrap();
+        let explaining_rb = explaining_df.collect().await.unwrap();
+        let explaining_rb = explaining_rb.first().unwrap();
+        let plan = get_str_column(explaining_rb, "plan").join("");
+
+        assert!(
+            plan.contains("input_partitions=2"),
+            "Complex keygen table should have input_partitions=2"
+        );
+        assert!(
+            plan.contains("DataSourceExec"),
+            "Plan should contain DataSourceExec"
+        );
+
+        // Verify data
+        let df = ctx.sql(&sql).await.unwrap();
+        let rb = df.collect().await.unwrap();
+        let rb = rb.first().unwrap();
+        assert_eq!(get_i32_column(rb, "id"), &[2, 4]);
+        assert_eq!(get_str_column(rb, "name"), &["Bob", "Diana"]);
+        assert_eq!(get_bool_column(rb, "isActive"), &[false, true]);
+    }
+}

Reply via email to