This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new c32e929  build(deps): upgrade datafusion to 52, require python 3.10 
(#535)
c32e929 is described below

commit c32e929976c99dbc0872acb65b0f11705a7b149b
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Mar 7 00:17:32 2026 -0600

    build(deps): upgrade datafusion to 52, require python 3.10 (#535)
---
 .github/workflows/ci.yml              |  6 ++--
 Cargo.toml                            | 14 ++++-----
 crates/core/src/table/mod.rs          |  5 ++++
 crates/datafusion/src/lib.rs          | 37 +++++++++++++++++++----
 crates/datafusion/tests/read_tests.rs | 32 +++++++++++++-------
 crates/test/Cargo.toml                |  2 +-
 crates/test/src/lib.rs                |  6 +++-
 python/.gitignore                     |  2 ++
 python/Cargo.toml                     |  2 +-
 python/pyproject.toml                 |  7 ++---
 python/src/datafusion_internal.rs     | 56 ++++++++++++++++++++++++-----------
 11 files changed, 120 insertions(+), 49 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 9eeda05..df27681 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -84,14 +84,14 @@ jobs:
       matrix:
         # TODO: add windows
         os: [ ubuntu-24.04, ubuntu-22.04-arm, macos-15 ]
-        python-version: [ '3.9', '3.13' ]
+        python-version: [ '3.10', '3.13' ]
         exclude:
           - os: ubuntu-24.04
-            python-version: '3.9'
+            python-version: '3.10'
           - os: ubuntu-22.04-arm
             python-version: '3.13'
           - os: macos-15
-            python-version: '3.9'
+            python-version: '3.10'
     runs-on: ${{ matrix.os }}
     steps:
       - uses: actions/checkout@v6
diff --git a/Cargo.toml b/Cargo.toml
index d1ea5d9..be59e70 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -55,11 +55,11 @@ apache-avro = { version = "0.21", features = ["derive"] }
 apache-avro-derive = { version = "0.21" }
 
 # datafusion
-datafusion = { version = "51" }
-datafusion-expr = { version = "51" }
-datafusion-common = { version = "51" }
-datafusion-physical-expr = { version = "51" }
-datafusion-ffi = { version = "51" }
+datafusion = { version = "52" }
+datafusion-expr = { version = "52" }
+datafusion-common = { version = "52" }
+datafusion-physical-expr = { version = "52" }
+datafusion-ffi = { version = "52" }
 
 # serde
 percent-encoding = { version = "2" }
@@ -87,7 +87,7 @@ futures = { version = "0.3" }
 tokio = { version = "1", features = ["rt-multi-thread"] }
 
 # protobuf
-prost = { version = "0.13" }
+prost = { version = "0.14" }
 
 # compression
 flate2 = { version = "1" }
@@ -95,4 +95,4 @@ flate2 = { version = "1" }
 # testing
 serial_test = { version = "3" }
 tempfile = { version = "3" }
-zip-extract = { version = "0.3" }
+zip = { version = "4", default-features = false, features = ["deflate"] }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 153105e..c151a63 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -1051,6 +1051,7 @@ mod tests {
     }
 
     #[test]
+    #[serial(env_vars)]
     fn hudi_table_get_schema_from_empty_table_without_create_schema() {
         let table = 
get_test_table_without_validation("table_props_no_create_schema");
 
@@ -1135,6 +1136,7 @@ mod tests {
     }
 
     #[test]
+    #[serial(env_vars)]
     fn validate_invalid_table_props() {
         let table = get_test_table_without_validation("table_props_invalid");
         let configs = table.hudi_configs;
@@ -1187,6 +1189,7 @@ mod tests {
     }
 
     #[test]
+    #[serial(env_vars)]
     fn get_invalid_table_props() {
         let table = get_test_table_without_validation("table_props_invalid");
         let configs = table.hudi_configs;
@@ -1209,6 +1212,7 @@ mod tests {
     }
 
     #[test]
+    #[serial(env_vars)]
     fn get_default_for_invalid_table_props() {
         let table = get_test_table_without_validation("table_props_invalid");
         let configs = table.hudi_configs;
@@ -1240,6 +1244,7 @@ mod tests {
     }
 
     #[test]
+    #[serial(env_vars)]
     fn get_valid_table_props() {
         let table = get_test_table_without_validation("table_props_valid");
         let configs = table.hudi_configs;
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index aeb4e7b..0674cd3 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -46,7 +46,7 @@ use datafusion_physical_expr::create_physical_expr;
 use log::warn;
 
 use crate::util::expr::exprs_to_filters;
-use hudi_core::config::read::HudiReadConfig::InputPartitions;
+use hudi_core::config::read::HudiReadConfig::{InputPartitions, 
UseReadOptimizedMode};
 use hudi_core::config::util::empty_options;
 use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
 use hudi_core::table::Table as HudiTable;
@@ -288,11 +288,34 @@ impl TableProvider for HudiDataSource {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         self.table.register_storage(state.runtime_env().clone());
 
+        // Only support COW tables, or MOR tables with read-optimized mode 
enabled.
+        if self.table.is_mor() {
+            let use_read_optimized: bool = self
+                .table
+                .hudi_configs
+                .get_or_default(UseReadOptimizedMode)
+                .into();
+            if !use_read_optimized {
+                return Err(Execution(
+                    "MOR table is not supported without read-optimized mode. \
+                     Set hoodie.read.use.read_optimized.mode=true to read only 
base files."
+                        .to_string(),
+                ));
+            }
+        }
+
+        // Resolve input partitions: use Hudi config if set, otherwise fall 
back
+        // to DataFusion's target_partitions (defaults to number of CPU cores).
+        let input_partitions = match self.get_input_partitions() {
+            0 => state.config_options().execution.target_partitions,
+            n => n,
+        };
+
         // Convert Datafusion `Expr` to `Filter`
         let pushdown_filters = exprs_to_filters(filters);
         let file_slices = self
             .table
-            .get_file_slices_splits(self.get_input_partitions(), 
pushdown_filters)
+            .get_file_slices_splits(input_partitions, pushdown_filters)
             .await
             .map_err(|e| Execution(format!("Failed to get file slices from 
Hudi table: {e}")))?;
         let base_url = self.table.base_url();
@@ -324,7 +347,11 @@ impl TableProvider for HudiDataSource {
             crypto: Default::default(),
         };
         let table_schema = self.schema();
-        let mut parquet_source = ParquetSource::new(parquet_opts);
+        let mut parquet_source = ParquetSource::new(table_schema.clone())
+            .with_table_parquet_options(parquet_opts)
+            .with_pushdown_filters(true)
+            .with_reorder_filters(true)
+            .with_enable_page_index(true);
         let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
         if let Some(expr) = filter {
             let df_schema = DFSchema::try_from(table_schema.clone())?;
@@ -337,9 +364,9 @@ impl TableProvider for HudiDataSource {
             .map(FileGroup::from)
             .collect();
 
-        let fsc = FileScanConfigBuilder::new(url, table_schema, 
Arc::new(parquet_source))
+        let fsc = FileScanConfigBuilder::new(url, Arc::new(parquet_source))
             .with_file_groups(file_groups)
-            .with_projection_indices(projection.cloned())
+            .with_projection_indices(projection.cloned())?
             .with_limit(limit)
             .build();
 
diff --git a/crates/datafusion/tests/read_tests.rs 
b/crates/datafusion/tests/read_tests.rs
index 5886c4f..9ba7b3e 100644
--- a/crates/datafusion/tests/read_tests.rs
+++ b/crates/datafusion/tests/read_tests.rs
@@ -128,16 +128,28 @@ async fn verify_plan(
     let explaining_rb = explaining_df.collect().await.unwrap();
     let explaining_rb = explaining_rb.first().unwrap();
     let plan = get_str_column(explaining_rb, "plan").join("");
-    let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
-    assert!(plan_lines[1].starts_with("SortExec: TopK(fetch=10)"));
-    assert!(plan_lines[2].starts_with(&format!(
-        "ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as 
isActive, \
-        get_field(structField@3, field2) as {table_name}.structField[field2]]"
-    )));
-    assert!(plan_lines[4].starts_with(
-        "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND name@1 != Alice AND 
get_field(structField@3, field2) > 30"
-    ));
-    
assert!(plan_lines[5].contains(&format!("input_partitions={planned_input_partitioned}")));
+    assert!(
+        plan.contains("SortExec: TopK(fetch=10)"),
+        "Plan should contain TopK sort"
+    );
+    assert!(
+        plan.contains(&format!(
+            "ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as 
isActive, \
+            get_field(structField@3, field2) as 
{table_name}.structField[field2]]"
+        )),
+        "Plan should contain expected projection"
+    );
+    // With pushdown_filters enabled, simple predicates (id % 2 = 0, name != 
Alice)
+    // are pushed into the Parquet source. Only non-pushable predicates like
+    // struct field access remain in FilterExec.
+    assert!(
+        plan.contains("get_field(structField@3, field2) > 30"),
+        "Plan should contain struct field filter (either in FilterExec or 
DataSourceExec)"
+    );
+    assert!(
+        
plan.contains(&format!("input_partitions={planned_input_partitioned}")),
+        "Plan should contain expected 
input_partitions={planned_input_partitioned}"
+    );
 }
 
 async fn verify_data(ctx: &SessionContext, sql: &str, table_name: &str) {
diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml
index 114f013..3042a6a 100644
--- a/crates/test/Cargo.toml
+++ b/crates/test/Cargo.toml
@@ -39,4 +39,4 @@ url = { workspace = true }
 
 # testing
 tempfile = { workspace = true }
-zip-extract = { workspace = true }
+zip = { workspace = true }
diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs
index 94b2dac..eed3888 100644
--- a/crates/test/src/lib.rs
+++ b/crates/test/src/lib.rs
@@ -24,13 +24,17 @@ use std::path::{Path, PathBuf};
 use strum_macros::{AsRefStr, EnumIter, EnumString};
 use tempfile::tempdir;
 use url::Url;
+use zip::ZipArchive;
 
 pub mod util;
 
 pub fn extract_test_table(zip_path: &Path) -> PathBuf {
     let target_dir = tempdir().unwrap().path().to_path_buf();
     let archive = fs::read(zip_path).unwrap();
-    zip_extract::extract(Cursor::new(archive), &target_dir, false).unwrap();
+    ZipArchive::new(Cursor::new(archive))
+        .unwrap()
+        .extract(&target_dir)
+        .unwrap();
     target_dir
 }
 
diff --git a/python/.gitignore b/python/.gitignore
index db2736b..cbe33c7 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -24,6 +24,8 @@ __pycache__/
 
 # C extensions
 *.so
+*.dylib
+*.dSYM
 
 # Distribution / packaging
 .Python
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 3b0c973..9d95717 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -49,7 +49,7 @@ tokio = { workspace = true }
 
 [dependencies.pyo3]
 version = "0.26"
-features = ["extension-module", "abi3", "abi3-py39"]
+features = ["extension-module", "abi3", "abi3-py310"]
 
 [features]
 default = []
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 9a2e17a..273246f 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -23,12 +23,11 @@ build-backend = "maturin"
 name = "hudi"
 description = "Native Python binding for Apache Hudi, based on hudi-rs."
 urls = { repository = "https://github.com/apache/hudi-rs/tree/main/python/"; }
-requires-python = ">=3.9"
+requires-python = ">=3.10"
 keywords = ["apachehudi", "hudi", "datalake", "arrow"]
 license = "Apache-2.0"
 classifiers = [
     "Programming Language :: Python :: 3",
-    "Programming Language :: Python :: 3.9",
     "Programming Language :: Python :: 3.10",
     "Programming Language :: Python :: 3.11",
     "Programming Language :: Python :: 3.12",
@@ -53,14 +52,14 @@ lint = [
     "mypy==1.19.1",   
 ]
 datafusion = [
-    "datafusion==50.1.0",
+    "datafusion==52.0.0",
 ]
 
 [tool.maturin]
 module-name = "hudi._internal"
 
 [tool.ruff]
-target-version = 'py39'
+target-version = 'py310'
 # Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`)  codes by 
default.
 lint.select = [
     "E4",
diff --git a/python/src/datafusion_internal.rs 
b/python/src/datafusion_internal.rs
index c7dcd5c..242bd37 100644
--- a/python/src/datafusion_internal.rs
+++ b/python/src/datafusion_internal.rs
@@ -17,10 +17,43 @@
  * under the License.
  */
 
+use std::sync::Arc;
+
+use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use datafusion_ffi::table_provider::FFI_TableProvider;
+use pyo3::exceptions::PyValueError;
+use pyo3::prelude::*;
+use pyo3::types::{PyCapsule, PyCapsuleMethods};
+
 use crate::internal::{PythonError, rt};
 use hudi::HudiDataSource as InternalDataFusionHudiDataSource;
-use pyo3::{Bound, types::PyCapsule};
-use pyo3::{PyErr, PyResult, Python, pyclass, pymethods};
+
+/// Extract the `FFI_LogicalExtensionCodec` from a DataFusion session object.
+///
+/// The session object is expected to have a 
`__datafusion_logical_extension_codec__`
+/// method that returns a PyCapsule containing the codec.
+fn extract_codec(session: Bound<PyAny>) -> PyResult<FFI_LogicalExtensionCodec> 
{
+    let capsule_obj = if 
session.hasattr("__datafusion_logical_extension_codec__")? {
+        session
+            .getattr("__datafusion_logical_extension_codec__")?
+            .call0()?
+    } else {
+        session
+    };
+    let capsule = capsule_obj.downcast::<PyCapsule>()?;
+    if let Some(name) = capsule.name()? {
+        let name = name
+            .to_str()
+            .map_err(|e| PyValueError::new_err(format!("{e}")))?;
+        if name != "datafusion_logical_extension_codec" {
+            return Err(PyValueError::new_err(format!(
+                "Expected PyCapsule name 'datafusion_logical_extension_codec', 
got '{name}'"
+            )));
+        }
+    }
+    let codec = unsafe { capsule.reference::<FFI_LogicalExtensionCodec>() };
+    Ok(codec.clone())
+}
 
 #[cfg(not(tarpaulin_include))]
 #[pyclass(name = "HudiDataFusionDataSource")]
@@ -44,25 +77,14 @@ impl HudiDataFusionDataSource {
         Ok(HudiDataFusionDataSource { table: inner })
     }
 
-    #[pyo3(signature = ())]
     fn __datafusion_table_provider__<'py>(
         &self,
         py: Python<'py>,
+        session: Bound<'py, PyAny>,
     ) -> PyResult<Bound<'py, PyCapsule>> {
-        use datafusion_ffi::table_provider::FFI_TableProvider;
-        use std::ffi::CString;
-        use std::sync::Arc;
-        let capsule_name = 
CString::new("datafusion_table_provider").map_err(|e| {
-            PyErr::new::<pyo3::exceptions::PyValueError, _>(format!("Invalid 
capsule name: {e}"))
-        })?;
-
-        // Clone the inner data source and wrap it in an Arc
         let provider = Arc::new(self.table.clone());
-
-        // Create the FFI wrapper
-        let ffi_provider = FFI_TableProvider::new(provider, false, None);
-
-        // Create and return the PyCapsule
-        PyCapsule::new(py, ffi_provider, Some(capsule_name))
+        let codec = extract_codec(session)?;
+        let ffi_provider = FFI_TableProvider::new_with_ffi_codec(provider, 
false, None, codec);
+        PyCapsule::new(py, ffi_provider, 
Some(cr"datafusion_table_provider".into()))
     }
 }

Reply via email to