This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ed980a  Bindings for LogicalPlan::Repartition (#285)
1ed980a is described below

commit 1ed980a8f24dcfb67070b7ec34468363056a5bbd
Author: Jeremy Dyer <[email protected]>
AuthorDate: Wed Mar 15 13:13:54 2023 -0400

    Bindings for LogicalPlan::Repartition (#285)
---
 datafusion/__init__.py           |   4 ++
 datafusion/tests/test_imports.py |   4 ++
 src/expr.rs                      |   3 +
 src/expr/repartition.rs          | 127 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 138 insertions(+)

diff --git a/datafusion/__init__.py b/datafusion/__init__.py
index e640c04..bb1beac 100644
--- a/datafusion/__init__.py
+++ b/datafusion/__init__.py
@@ -83,6 +83,8 @@ from .expr import (
     CreateView,
     Distinct,
     DropTable,
+    Repartition,
+    Partitioning,
 )
 
 __version__ = importlib_metadata.version(__name__)
@@ -141,6 +143,8 @@ __all__ = [
     "CreateView",
     "Distinct",
     "DropTable",
+    "Repartition",
+    "Partitioning",
 ]
 
 
diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py
index eaa2302..766ddce 100644
--- a/datafusion/tests/test_imports.py
+++ b/datafusion/tests/test_imports.py
@@ -84,6 +84,8 @@ from datafusion.expr import (
     CreateView,
     Distinct,
     DropTable,
+    Repartition,
+    Partitioning,
 )
 
 
@@ -157,6 +159,8 @@ def test_class_module_is_datafusion():
         CreateView,
         Distinct,
         DropTable,
+        Repartition,
+        Partitioning,
     ]:
         assert klass.__module__ == "datafusion.expr"
 
diff --git a/src/expr.rs b/src/expr.rs
index 579f509..7c80d0d 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -68,6 +68,7 @@ pub mod literal;
 pub mod logical_node;
 pub mod placeholder;
 pub mod projection;
+pub mod repartition;
 pub mod scalar_function;
 pub mod scalar_subquery;
 pub mod scalar_variable;
@@ -287,5 +288,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
     m.add_class::<distinct::PyDistinct>()?;
     m.add_class::<subquery_alias::PySubqueryAlias>()?;
     m.add_class::<drop_table::PyDropTable>()?;
+    m.add_class::<repartition::PyPartitioning>()?;
+    m.add_class::<repartition::PyRepartition>()?;
     Ok(())
 }
diff --git a/src/expr/repartition.rs b/src/expr/repartition.rs
new file mode 100644
index 0000000..e3e14f8
--- /dev/null
+++ b/src/expr/repartition.rs
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::{self, Display, Formatter};
+
+use datafusion_expr::{logical_plan::Repartition, Expr, Partitioning};
+use pyo3::prelude::*;
+
+use crate::{errors::py_type_err, sql::logical::PyLogicalPlan};
+
+use super::{logical_node::LogicalNode, PyExpr};
+
+#[pyclass(name = "Repartition", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyRepartition {
+    repartition: Repartition,
+}
+
+#[pyclass(name = "Partitioning", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyPartitioning {
+    partitioning: Partitioning,
+}
+
+impl From<PyPartitioning> for Partitioning {
+    fn from(partitioning: PyPartitioning) -> Self {
+        partitioning.partitioning
+    }
+}
+
+impl From<Partitioning> for PyPartitioning {
+    fn from(partitioning: Partitioning) -> Self {
+        PyPartitioning { partitioning }
+    }
+}
+
+impl From<PyRepartition> for Repartition {
+    fn from(repartition: PyRepartition) -> Self {
+        repartition.repartition
+    }
+}
+
+impl From<Repartition> for PyRepartition {
+    fn from(repartition: Repartition) -> PyRepartition {
+        PyRepartition { repartition }
+    }
+}
+
+impl Display for PyRepartition {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "Repartition
+            input: {:?}
+            partitioning_scheme: {:?}",
+            &self.repartition.input, &self.repartition.partitioning_scheme,
+        )
+    }
+}
+
+#[pymethods]
+impl PyRepartition {
+    fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
+        Ok(Self::inputs(self))
+    }
+
+    fn partitioning_scheme(&self) -> PyResult<PyPartitioning> {
+        Ok(PyPartitioning {
+            partitioning: self.repartition.partitioning_scheme.clone(),
+        })
+    }
+
+    fn distribute_list(&self) -> PyResult<Vec<PyExpr>> {
+        match &self.repartition.partitioning_scheme {
+            Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
+                .iter()
+                .map(|e| PyExpr::from(e.clone()))
+                .collect()),
+            _ => Err(py_type_err("unexpected repartition strategy")),
+        }
+    }
+
+    fn distribute_columns(&self) -> PyResult<String> {
+        match &self.repartition.partitioning_scheme {
+            Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
+                .iter()
+                .map(|e| match &e {
+                    Expr::Column(column) => column.name.clone(),
+                    _ => panic!("Encountered a type other than Expr::Column"),
+                })
+                .collect()),
+            _ => Err(py_type_err("unexpected repartition strategy")),
+        }
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("Repartition({})", self))
+    }
+
+    fn __name__(&self) -> PyResult<String> {
+        Ok("Repartition".to_string())
+    }
+}
+
+impl LogicalNode for PyRepartition {
+    fn inputs(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.repartition.input).clone())]
+    }
+
+    fn to_variant(&self, py: Python) -> PyResult<PyObject> {
+        Ok(self.clone().into_py(py))
+    }
+}

Reply via email to