This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 57ac912 [chore] Bump arrow version to 57 (#58)
57ac912 is described below
commit 57ac91249475befa3faad54da6b3771fee66e459
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Nov 24 10:40:02 2025 +0800
[chore] Bump arrow version to 57 (#58)
---
Cargo.toml | 2 +-
bindings/python/Cargo.toml | 8 +++++---
bindings/python/src/admin.rs | 6 +++---
bindings/python/src/connection.rs | 12 ++++++------
bindings/python/src/metadata.rs | 6 +++---
bindings/python/src/table.rs | 18 +++++++++---------
bindings/python/src/utils.rs | 30 ++++++++++++++++++------------
crates/fluss/Cargo.toml | 2 +-
8 files changed, 46 insertions(+), 38 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 54436ac..e745d95 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,5 +34,5 @@ members = ["crates/fluss", "crates/examples",
"bindings/python"]
fluss = { version = "0.1.0", path = "./crates/fluss" }
tokio = { version = "1.44.2", features = ["full"] }
clap = { version = "4.5.37", features = ["derive"] }
-arrow = "55.1.0"
+arrow = "57.0.0"
chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 04826fb..9ecc629 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -27,11 +27,13 @@ name = "fluss"
crate-type = ["cdylib"]
[dependencies]
-pyo3 = { version = "0.24", features = ["extension-module"] }
+pyo3 = { version = "0.26.0", features = ["extension-module"] }
fluss = { path = "../../crates/fluss" }
tokio = { workspace = true }
arrow = { workspace = true }
-arrow-pyarrow = "55.1.0"
-pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] }
+arrow-pyarrow = "57.0.0"
+arrow-schema = "57.0.0"
+arrow-array = "57.0.0"
+pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
chrono = { workspace = true }
once_cell = "1.21.3"
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index 73b2dd3..fa189eb 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -48,7 +48,7 @@ impl FlussAdmin {
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
- Python::with_gil(|py| Ok(py.None()))
+ Python::attach(|py| Ok(py.None()))
})
}
@@ -67,7 +67,7 @@ impl FlussAdmin {
.await
.map_err(|e| FlussError::new_err(format!("Failed to get table:
{e}")))?;
- Python::with_gil(|py| {
+ Python::attach(|py| {
let table_info = TableInfo::from_core(core_table_info);
Py::new(py, table_info)
})
@@ -89,7 +89,7 @@ impl FlussAdmin {
.await
.map_err(|e| FlussError::new_err(format!("Failed to get lake
snapshot: {e}")))?;
- Python::with_gil(|py| {
+ Python::attach(|py| {
let lake_snapshot =
LakeSnapshot::from_core(core_lake_snapshot);
Py::new(py, lake_snapshot)
})
diff --git a/bindings/python/src/connection.rs
b/bindings/python/src/connection.rs
index aeb8410..a7559ce 100644
--- a/bindings/python/src/connection.rs
+++ b/bindings/python/src/connection.rs
@@ -41,7 +41,7 @@ impl FlussConnection {
inner: Arc::new(connection),
};
- Python::with_gil(|py| Py::new(py, py_connection))
+ Python::attach(|py| Py::new(py, py_connection))
})
}
@@ -57,7 +57,7 @@ impl FlussConnection {
let py_admin = FlussAdmin::from_core(admin);
- Python::with_gil(|py| Py::new(py, py_admin))
+ Python::attach(|py| Py::new(py, py_admin))
})
}
@@ -84,7 +84,7 @@ impl FlussConnection {
core_table.has_primary_key(),
);
- Python::with_gil(|py| Py::new(py, py_table))
+ Python::attach(|py| Py::new(py, py_table))
})
}
@@ -102,9 +102,9 @@ impl FlussConnection {
#[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
fn __exit__(
&mut self,
- _exc_type: Option<PyObject>,
- _exc_value: Option<PyObject>,
- _traceback: Option<PyObject>,
+ _exc_type: Option<Bound<'_, PyAny>>,
+ _exc_value: Option<Bound<'_, PyAny>>,
+ _traceback: Option<Bound<'_, PyAny>>,
) -> PyResult<bool> {
self.close()?;
Ok(false)
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index 66748ab..bc5f288 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -106,7 +106,7 @@ impl Schema {
#[new]
#[pyo3(signature = (schema, primary_keys=None))]
pub fn new(
- schema: PyObject, // PyArrow schema
+ schema: Py<PyAny>, // PyArrow schema
primary_keys: Option<Vec<String>>,
) -> PyResult<Self> {
let arrow_schema =
crate::utils::Utils::pyarrow_to_arrow_schema(&schema)?;
@@ -553,7 +553,7 @@ impl LakeSnapshot {
/// Get table bucket offsets as a Python dictionary with TableBucket keys
#[getter]
- pub fn table_buckets_offset(&self, py: Python) -> PyResult<PyObject> {
+ pub fn table_buckets_offset(&self, py: Python) -> PyResult<Py<PyAny>> {
let dict = PyDict::new(py);
for (bucket, offset) in &self.table_buckets_offset {
let py_bucket = TableBucket::from_core(bucket.clone());
@@ -569,7 +569,7 @@ impl LakeSnapshot {
}
/// Get all table buckets
- pub fn get_table_buckets(&self, py: Python) -> PyResult<Vec<PyObject>> {
+ pub fn get_table_buckets(&self, py: Python) -> PyResult<Vec<Py<PyAny>>> {
let mut buckets = Vec::new();
for bucket in self.table_buckets_offset.keys() {
let py_bucket = TableBucket::from_core(bucket.clone());
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index c255fa6..2a8df25 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -51,7 +51,7 @@ impl FlussTable {
let py_writer = AppendWriter::from_core(rust_writer);
- Python::with_gil(|py| Py::new(py, py_writer))
+ Python::attach(|py| Py::new(py, py_writer))
})
}
@@ -75,7 +75,7 @@ impl FlussTable {
.map_err(|e| FlussError::new_err(e.to_string()))?;
let py_scanner = LogScanner::from_core(rust_scanner, admin,
table_info.clone());
- Python::with_gil(|py| Py::new(py, py_scanner))
+ Python::attach(|py| Py::new(py, py_scanner))
})
}
@@ -131,10 +131,10 @@ pub struct AppendWriter {
#[pymethods]
impl AppendWriter {
/// Write Arrow table data
- pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()>
{
+ pub fn write_arrow(&mut self, py: Python, table: Py<PyAny>) ->
PyResult<()> {
// Convert Arrow Table to batches and write each batch
let batches = table.call_method0(py, "to_batches")?;
- let batch_list: Vec<PyObject> = batches.extract(py)?;
+ let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
for batch in batch_list {
self.write_arrow_batch(py, batch)?;
@@ -143,7 +143,7 @@ impl AppendWriter {
}
/// Write Arrow batch data
- pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) ->
PyResult<()> {
+ pub fn write_arrow_batch(&mut self, py: Python, batch: Py<PyAny>) ->
PyResult<()> {
// Extract number of rows and columns from the Arrow batch
let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
let num_columns: usize = batch.getattr(py,
"num_columns")?.extract(py)?;
@@ -175,7 +175,7 @@ impl AppendWriter {
}
/// Write Pandas DataFrame data
- pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+ pub fn write_pandas(&mut self, py: Python, df: Py<PyAny>) -> PyResult<()> {
// Import pyarrow module
let pyarrow = py.import("pyarrow")?;
@@ -213,7 +213,7 @@ impl AppendWriter {
fn convert_python_value_to_datum(
&self,
py: Python,
- value: PyObject,
+ value: Py<PyAny>,
) -> PyResult<fcore::row::Datum<'static>> {
use fcore::row::{Blob, Datum, F32, F64};
@@ -321,7 +321,7 @@ impl LogScanner {
}
/// Convert all data to Arrow Table
- fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
+ fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
use std::collections::HashMap;
use std::time::Duration;
@@ -387,7 +387,7 @@ impl LogScanner {
}
/// Convert all data to Pandas DataFrame
- fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
+ fn to_pandas(&self, py: Python) -> PyResult<Py<PyAny>> {
let arrow_table = self.to_arrow(py)?;
// Convert Arrow Table to Pandas DataFrame using pyarrow
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index 93933b3..09e6b5f 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -16,8 +16,8 @@
// under the License.
use crate::*;
-use arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
-use arrow_pyarrow::ToPyArrow;
+use arrow_pyarrow::{FromPyArrow, ToPyArrow};
+use arrow_schema::SchemaRef;
use std::sync::Arc;
/// Utilities for schema conversion between PyArrow, Arrow, and Fluss
@@ -25,11 +25,10 @@ pub struct Utils;
impl Utils {
/// Convert PyArrow schema to Rust Arrow schema
- pub fn pyarrow_to_arrow_schema(py_schema: &PyObject) ->
PyResult<SchemaRef> {
- Python::with_gil(|py| {
+ pub fn pyarrow_to_arrow_schema(py_schema: &Py<PyAny>) ->
PyResult<SchemaRef> {
+ Python::attach(|py| {
let schema_bound = py_schema.bind(py);
-
- let schema: ArrowSchema =
arrow_pyarrow::FromPyArrow::from_pyarrow_bound(schema_bound)
+ let schema: arrow_schema::Schema =
FromPyArrow::from_pyarrow_bound(schema_bound)
.map_err(|e| {
FlussError::new_err(format!("Failed to convert PyArrow
schema: {e}"))
})?;
@@ -172,14 +171,21 @@ impl Utils {
pub fn combine_batches_to_table(
py: Python,
batches: Vec<Arc<arrow::record_batch::RecordBatch>>,
- ) -> PyResult<PyObject> {
- // Convert Rust Arrow RecordBatch to PyObject
- let py_batches: Result<Vec<PyObject>, _> = batches
+ ) -> PyResult<Py<PyAny>> {
+ use arrow_array::RecordBatch as ArrowArrayRecordBatch;
+
+ let py_batches: Result<Vec<Py<PyAny>>, _> = batches
.iter()
.map(|batch| {
- batch.as_ref().to_pyarrow(py).map_err(|e| {
- FlussError::new_err(format!("Failed to convert RecordBatch
to PyObject: {e}"))
- })
+ ArrowArrayRecordBatch::try_new(batch.schema().clone(),
batch.columns().to_vec())
+ .map_err(|e| FlussError::new_err(format!("Failed to
convert RecordBatch: {e}")))
+ .and_then(|b| {
+ ToPyArrow::to_pyarrow(&b, py)
+ .map(|x| x.into())
+ .map_err(|e| {
+ FlussError::new_err(format!("Failed to convert
to PyObject: {e}"))
+ })
+ })
})
.collect();
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index ab1efc2..af77037 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -24,7 +24,7 @@ build = "src/build.rs"
[dependencies]
arrow = { workspace = true }
-arrow-schema = "55.1.0"
+arrow-schema = "57.0.0"
byteorder = "1.5"
futures = "0.3"
clap = { workspace = true }