This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c32e929 build(deps): upgrade datafusion to 52, require python 3.10
(#535)
c32e929 is described below
commit c32e929976c99dbc0872acb65b0f11705a7b149b
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Mar 7 00:17:32 2026 -0600
build(deps): upgrade datafusion to 52, require python 3.10 (#535)
---
.github/workflows/ci.yml | 6 ++--
Cargo.toml | 14 ++++-----
crates/core/src/table/mod.rs | 5 ++++
crates/datafusion/src/lib.rs | 37 +++++++++++++++++++----
crates/datafusion/tests/read_tests.rs | 32 +++++++++++++-------
crates/test/Cargo.toml | 2 +-
crates/test/src/lib.rs | 6 +++-
python/.gitignore | 2 ++
python/Cargo.toml | 2 +-
python/pyproject.toml | 7 ++---
python/src/datafusion_internal.rs | 56 ++++++++++++++++++++++++-----------
11 files changed, 120 insertions(+), 49 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 9eeda05..df27681 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -84,14 +84,14 @@ jobs:
matrix:
# TODO: add windows
os: [ ubuntu-24.04, ubuntu-22.04-arm, macos-15 ]
- python-version: [ '3.9', '3.13' ]
+ python-version: [ '3.10', '3.13' ]
exclude:
- os: ubuntu-24.04
- python-version: '3.9'
+ python-version: '3.10'
- os: ubuntu-22.04-arm
python-version: '3.13'
- os: macos-15
- python-version: '3.9'
+ python-version: '3.10'
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v6
diff --git a/Cargo.toml b/Cargo.toml
index d1ea5d9..be59e70 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -55,11 +55,11 @@ apache-avro = { version = "0.21", features = ["derive"] }
apache-avro-derive = { version = "0.21" }
# datafusion
-datafusion = { version = "51" }
-datafusion-expr = { version = "51" }
-datafusion-common = { version = "51" }
-datafusion-physical-expr = { version = "51" }
-datafusion-ffi = { version = "51" }
+datafusion = { version = "52" }
+datafusion-expr = { version = "52" }
+datafusion-common = { version = "52" }
+datafusion-physical-expr = { version = "52" }
+datafusion-ffi = { version = "52" }
# serde
percent-encoding = { version = "2" }
@@ -87,7 +87,7 @@ futures = { version = "0.3" }
tokio = { version = "1", features = ["rt-multi-thread"] }
# protobuf
-prost = { version = "0.13" }
+prost = { version = "0.14" }
# compression
flate2 = { version = "1" }
@@ -95,4 +95,4 @@ flate2 = { version = "1" }
# testing
serial_test = { version = "3" }
tempfile = { version = "3" }
-zip-extract = { version = "0.3" }
+zip = { version = "4", default-features = false, features = ["deflate"] }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 153105e..c151a63 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -1051,6 +1051,7 @@ mod tests {
}
#[test]
+ #[serial(env_vars)]
fn hudi_table_get_schema_from_empty_table_without_create_schema() {
let table =
get_test_table_without_validation("table_props_no_create_schema");
@@ -1135,6 +1136,7 @@ mod tests {
}
#[test]
+ #[serial(env_vars)]
fn validate_invalid_table_props() {
let table = get_test_table_without_validation("table_props_invalid");
let configs = table.hudi_configs;
@@ -1187,6 +1189,7 @@ mod tests {
}
#[test]
+ #[serial(env_vars)]
fn get_invalid_table_props() {
let table = get_test_table_without_validation("table_props_invalid");
let configs = table.hudi_configs;
@@ -1209,6 +1212,7 @@ mod tests {
}
#[test]
+ #[serial(env_vars)]
fn get_default_for_invalid_table_props() {
let table = get_test_table_without_validation("table_props_invalid");
let configs = table.hudi_configs;
@@ -1240,6 +1244,7 @@ mod tests {
}
#[test]
+ #[serial(env_vars)]
fn get_valid_table_props() {
let table = get_test_table_without_validation("table_props_valid");
let configs = table.hudi_configs;
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index aeb4e7b..0674cd3 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -46,7 +46,7 @@ use datafusion_physical_expr::create_physical_expr;
use log::warn;
use crate::util::expr::exprs_to_filters;
-use hudi_core::config::read::HudiReadConfig::InputPartitions;
+use hudi_core::config::read::HudiReadConfig::{InputPartitions,
UseReadOptimizedMode};
use hudi_core::config::util::empty_options;
use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
use hudi_core::table::Table as HudiTable;
@@ -288,11 +288,34 @@ impl TableProvider for HudiDataSource {
) -> Result<Arc<dyn ExecutionPlan>> {
self.table.register_storage(state.runtime_env().clone());
+ // Only support COW tables, or MOR tables with read-optimized mode
enabled.
+ if self.table.is_mor() {
+ let use_read_optimized: bool = self
+ .table
+ .hudi_configs
+ .get_or_default(UseReadOptimizedMode)
+ .into();
+ if !use_read_optimized {
+ return Err(Execution(
+ "MOR table is not supported without read-optimized mode. \
+ Set hoodie.read.use.read_optimized.mode=true to read only
base files."
+ .to_string(),
+ ));
+ }
+ }
+
+ // Resolve input partitions: use Hudi config if set, otherwise fall
back
+ // to DataFusion's target_partitions (defaults to number of CPU cores).
+ let input_partitions = match self.get_input_partitions() {
+ 0 => state.config_options().execution.target_partitions,
+ n => n,
+ };
+
// Convert Datafusion `Expr` to `Filter`
let pushdown_filters = exprs_to_filters(filters);
let file_slices = self
.table
- .get_file_slices_splits(self.get_input_partitions(),
pushdown_filters)
+ .get_file_slices_splits(input_partitions, pushdown_filters)
.await
.map_err(|e| Execution(format!("Failed to get file slices from
Hudi table: {e}")))?;
let base_url = self.table.base_url();
@@ -324,7 +347,11 @@ impl TableProvider for HudiDataSource {
crypto: Default::default(),
};
let table_schema = self.schema();
- let mut parquet_source = ParquetSource::new(parquet_opts);
+ let mut parquet_source = ParquetSource::new(table_schema.clone())
+ .with_table_parquet_options(parquet_opts)
+ .with_pushdown_filters(true)
+ .with_reorder_filters(true)
+ .with_enable_page_index(true);
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
if let Some(expr) = filter {
let df_schema = DFSchema::try_from(table_schema.clone())?;
@@ -337,9 +364,9 @@ impl TableProvider for HudiDataSource {
.map(FileGroup::from)
.collect();
- let fsc = FileScanConfigBuilder::new(url, table_schema,
Arc::new(parquet_source))
+ let fsc = FileScanConfigBuilder::new(url, Arc::new(parquet_source))
.with_file_groups(file_groups)
- .with_projection_indices(projection.cloned())
+ .with_projection_indices(projection.cloned())?
.with_limit(limit)
.build();
diff --git a/crates/datafusion/tests/read_tests.rs
b/crates/datafusion/tests/read_tests.rs
index 5886c4f..9ba7b3e 100644
--- a/crates/datafusion/tests/read_tests.rs
+++ b/crates/datafusion/tests/read_tests.rs
@@ -128,16 +128,28 @@ async fn verify_plan(
let explaining_rb = explaining_df.collect().await.unwrap();
let explaining_rb = explaining_rb.first().unwrap();
let plan = get_str_column(explaining_rb, "plan").join("");
- let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
- assert!(plan_lines[1].starts_with("SortExec: TopK(fetch=10)"));
- assert!(plan_lines[2].starts_with(&format!(
- "ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as
isActive, \
- get_field(structField@3, field2) as {table_name}.structField[field2]]"
- )));
- assert!(plan_lines[4].starts_with(
- "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND name@1 != Alice AND
get_field(structField@3, field2) > 30"
- ));
-
assert!(plan_lines[5].contains(&format!("input_partitions={planned_input_partitioned}")));
+ assert!(
+ plan.contains("SortExec: TopK(fetch=10)"),
+ "Plan should contain TopK sort"
+ );
+ assert!(
+ plan.contains(&format!(
+ "ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as
isActive, \
+ get_field(structField@3, field2) as
{table_name}.structField[field2]]"
+ )),
+ "Plan should contain expected projection"
+ );
+ // With pushdown_filters enabled, simple predicates (id % 2 = 0, name !=
Alice)
+ // are pushed into the Parquet source. Only non-pushable predicates like
+ // struct field access remain in FilterExec.
+ assert!(
+ plan.contains("get_field(structField@3, field2) > 30"),
+ "Plan should contain struct field filter (either in FilterExec or
DataSourceExec)"
+ );
+ assert!(
+
plan.contains(&format!("input_partitions={planned_input_partitioned}")),
+ "Plan should contain expected
input_partitions={planned_input_partitioned}"
+ );
}
async fn verify_data(ctx: &SessionContext, sql: &str, table_name: &str) {
diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml
index 114f013..3042a6a 100644
--- a/crates/test/Cargo.toml
+++ b/crates/test/Cargo.toml
@@ -39,4 +39,4 @@ url = { workspace = true }
# testing
tempfile = { workspace = true }
-zip-extract = { workspace = true }
+zip = { workspace = true }
diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs
index 94b2dac..eed3888 100644
--- a/crates/test/src/lib.rs
+++ b/crates/test/src/lib.rs
@@ -24,13 +24,17 @@ use std::path::{Path, PathBuf};
use strum_macros::{AsRefStr, EnumIter, EnumString};
use tempfile::tempdir;
use url::Url;
+use zip::ZipArchive;
pub mod util;
pub fn extract_test_table(zip_path: &Path) -> PathBuf {
let target_dir = tempdir().unwrap().path().to_path_buf();
let archive = fs::read(zip_path).unwrap();
- zip_extract::extract(Cursor::new(archive), &target_dir, false).unwrap();
+ ZipArchive::new(Cursor::new(archive))
+ .unwrap()
+ .extract(&target_dir)
+ .unwrap();
target_dir
}
diff --git a/python/.gitignore b/python/.gitignore
index db2736b..cbe33c7 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -24,6 +24,8 @@ __pycache__/
# C extensions
*.so
+*.dylib
+*.dSYM
# Distribution / packaging
.Python
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 3b0c973..9d95717 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -49,7 +49,7 @@ tokio = { workspace = true }
[dependencies.pyo3]
version = "0.26"
-features = ["extension-module", "abi3", "abi3-py39"]
+features = ["extension-module", "abi3", "abi3-py310"]
[features]
default = []
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 9a2e17a..273246f 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -23,12 +23,11 @@ build-backend = "maturin"
name = "hudi"
description = "Native Python binding for Apache Hudi, based on hudi-rs."
urls = { repository = "https://github.com/apache/hudi-rs/tree/main/python/" }
-requires-python = ">=3.9"
+requires-python = ">=3.10"
keywords = ["apachehudi", "hudi", "datalake", "arrow"]
license = "Apache-2.0"
classifiers = [
"Programming Language :: Python :: 3",
- "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
@@ -53,14 +52,14 @@ lint = [
"mypy==1.19.1",
]
datafusion = [
- "datafusion==50.1.0",
+ "datafusion==52.0.0",
]
[tool.maturin]
module-name = "hudi._internal"
[tool.ruff]
-target-version = 'py39'
+target-version = 'py310'
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by
default.
lint.select = [
"E4",
diff --git a/python/src/datafusion_internal.rs
b/python/src/datafusion_internal.rs
index c7dcd5c..242bd37 100644
--- a/python/src/datafusion_internal.rs
+++ b/python/src/datafusion_internal.rs
@@ -17,10 +17,43 @@
* under the License.
*/
+use std::sync::Arc;
+
+use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use datafusion_ffi::table_provider::FFI_TableProvider;
+use pyo3::exceptions::PyValueError;
+use pyo3::prelude::*;
+use pyo3::types::{PyCapsule, PyCapsuleMethods};
+
use crate::internal::{PythonError, rt};
use hudi::HudiDataSource as InternalDataFusionHudiDataSource;
-use pyo3::{Bound, types::PyCapsule};
-use pyo3::{PyErr, PyResult, Python, pyclass, pymethods};
+
+/// Extract the `FFI_LogicalExtensionCodec` from a DataFusion session object.
+///
+/// The session object is expected to have a
`__datafusion_logical_extension_codec__`
+/// method that returns a PyCapsule containing the codec.
+fn extract_codec(session: Bound<PyAny>) -> PyResult<FFI_LogicalExtensionCodec>
{
+ let capsule_obj = if
session.hasattr("__datafusion_logical_extension_codec__")? {
+ session
+ .getattr("__datafusion_logical_extension_codec__")?
+ .call0()?
+ } else {
+ session
+ };
+ let capsule = capsule_obj.downcast::<PyCapsule>()?;
+ if let Some(name) = capsule.name()? {
+ let name = name
+ .to_str()
+ .map_err(|e| PyValueError::new_err(format!("{e}")))?;
+ if name != "datafusion_logical_extension_codec" {
+ return Err(PyValueError::new_err(format!(
+ "Expected PyCapsule name 'datafusion_logical_extension_codec',
got '{name}'"
+ )));
+ }
+ }
+ let codec = unsafe { capsule.reference::<FFI_LogicalExtensionCodec>() };
+ Ok(codec.clone())
+}
#[cfg(not(tarpaulin_include))]
#[pyclass(name = "HudiDataFusionDataSource")]
@@ -44,25 +77,14 @@ impl HudiDataFusionDataSource {
Ok(HudiDataFusionDataSource { table: inner })
}
- #[pyo3(signature = ())]
fn __datafusion_table_provider__<'py>(
&self,
py: Python<'py>,
+ session: Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyCapsule>> {
- use datafusion_ffi::table_provider::FFI_TableProvider;
- use std::ffi::CString;
- use std::sync::Arc;
- let capsule_name =
CString::new("datafusion_table_provider").map_err(|e| {
- PyErr::new::<pyo3::exceptions::PyValueError, _>(format!("Invalid
capsule name: {e}"))
- })?;
-
- // Clone the inner data source and wrap it in an Arc
let provider = Arc::new(self.table.clone());
-
- // Create the FFI wrapper
- let ffi_provider = FFI_TableProvider::new(provider, false, None);
-
- // Create and return the PyCapsule
- PyCapsule::new(py, ffi_provider, Some(capsule_name))
+ let codec = extract_codec(session)?;
+ let ffi_provider = FFI_TableProvider::new_with_ffi_codec(provider,
false, None, codec);
+ PyCapsule::new(py, ffi_provider,
Some(cr"datafusion_table_provider".into()))
}
}