This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 030873bb Add temporary view option for into_view (#1267)
030873bb is described below
commit 030873bbb1b98ec7664d1297b76a4f24dc205369
Author: Tim Saucer <[email protected]>
AuthorDate: Wed Oct 15 04:57:32 2025 -0400
Add temporary view option for into_view (#1267)
---
python/datafusion/dataframe.py | 4 +-
python/tests/test_context.py | 10 ++++-
src/dataframe.rs | 17 +++++---
src/table.rs | 94 ++++++++++++++++++++++++++++++++++++++++--
4 files changed, 111 insertions(+), 14 deletions(-)
diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py
index 0b77d42e..c21a3bc7 100644
--- a/python/datafusion/dataframe.py
+++ b/python/datafusion/dataframe.py
@@ -318,7 +318,7 @@ class DataFrame:
"""
self.df = df
- def into_view(self) -> Table:
+ def into_view(self, temporary: bool = False) -> Table:
"""Convert ``DataFrame`` into a :class:`~datafusion.Table`.
Examples:
@@ -332,7 +332,7 @@ class DataFrame:
"""
from datafusion.catalog import Table as _Table
- return _Table(self.df.into_view())
+ return _Table(self.df.into_view(temporary))
def __getitem__(self, key: str | list[str]) -> DataFrame:
"""Return a new :py:class`DataFrame` with the specified column or
columns.
diff --git a/python/tests/test_context.py b/python/tests/test_context.py
index 94d1e6a3..bd65305e 100644
--- a/python/tests/test_context.py
+++ b/python/tests/test_context.py
@@ -357,10 +357,16 @@ def test_register_table_from_dataframe(ctx):
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
-def test_register_table_from_dataframe_into_view(ctx):
[email protected]("temporary", [True, False])
+def test_register_table_from_dataframe_into_view(ctx, temporary):
df = ctx.from_pydict({"a": [1, 2]})
- table = df.into_view()
+ table = df.into_view(temporary=temporary)
assert isinstance(table, Table)
+ if temporary:
+ assert table.kind == "temporary"
+ else:
+ assert table.kind == "view"
+
ctx.register_table("view_tbl", table)
result = ctx.sql("SELECT * FROM view_tbl").collect()
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 1f87f99d..df3e9d31 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -28,6 +28,7 @@ use arrow::pyarrow::FromPyArrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
use datafusion::arrow::util::pretty;
+use datafusion::catalog::TableProvider;
use datafusion::common::UnnestOptions;
use datafusion::config::{CsvOptions, ParquetColumnOptions, ParquetOptions,
TableParquetOptions};
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
@@ -49,7 +50,7 @@ use crate::expr::sort_expr::to_sort_expressions;
use crate::physical_plan::PyExecutionPlan;
use crate::record_batch::PyRecordBatchStream;
use crate::sql::logical::PyLogicalPlan;
-use crate::table::PyTable;
+use crate::table::{PyTable, TempViewTable};
use crate::utils::{
get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value,
validate_pycapsule, wait_for_future,
};
@@ -420,11 +421,15 @@ impl PyDataFrame {
/// because we're working with Python bindings
/// where objects are shared
#[allow(clippy::wrong_self_convention)]
- pub fn into_view(&self) -> PyDataFusionResult<PyTable> {
- // Call the underlying Rust DataFrame::into_view method.
- // Note that the Rust method consumes self; here we clone the inner
Arc<DataFrame>
- // so that we don't invalidate this PyDataFrame.
- let table_provider = self.df.as_ref().clone().into_view();
+ pub fn into_view(&self, temporary: bool) -> PyDataFusionResult<PyTable> {
+ let table_provider = if temporary {
+ Arc::new(TempViewTable::new(Arc::clone(&self.df))) as Arc<dyn
TableProvider>
+ } else {
+ // Call the underlying Rust DataFrame::into_view method.
+ // Note that the Rust method consumes self; here we clone the
inner Arc<DataFrame>
+ // so that we don't invalidate this PyDataFrame.
+ self.df.as_ref().clone().into_view()
+ };
Ok(PyTable::from(table_provider))
}
diff --git a/src/table.rs b/src/table.rs
index b830f776..fdca4d3e 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -15,15 +15,22 @@
// specific language governing permissions and limitations
// under the License.
+use crate::dataframe::PyDataFrame;
+use crate::dataset::Dataset;
+use crate::utils::table_provider_from_pycapsule;
+use arrow::datatypes::SchemaRef;
use arrow::pyarrow::ToPyArrow;
+use async_trait::async_trait;
+use datafusion::catalog::Session;
+use datafusion::common::Column;
use datafusion::datasource::{TableProvider, TableType};
+use datafusion::logical_expr::{Expr, LogicalPlanBuilder,
TableProviderFilterPushDown};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::DataFrame;
use pyo3::prelude::*;
+use std::any::Any;
use std::sync::Arc;
-use crate::dataframe::PyDataFrame;
-use crate::dataset::Dataset;
-use crate::utils::table_provider_from_pycapsule;
-
/// This struct is used as a common method for all TableProviders,
/// whether they refer to an FFI provider, an internally known
/// implementation, a dataset, or a dataframe view.
@@ -104,3 +111,82 @@ impl From<Arc<dyn TableProvider>> for PyTable {
Self { table }
}
}
+
+#[derive(Clone, Debug)]
+pub(crate) struct TempViewTable {
+ df: Arc<DataFrame>,
+}
+
+/// This is nearly identical to `DataFrameTableProvider`
+/// except that it is for temporary tables.
+/// Remove when https://github.com/apache/datafusion/issues/18026
+/// closes.
+impl TempViewTable {
+ pub(crate) fn new(df: Arc<DataFrame>) -> Self {
+ Self { df }
+ }
+}
+
+#[async_trait]
+impl TableProvider for TempViewTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::new(self.df.schema().into())
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Temporary
+ }
+
+ async fn scan(
+ &self,
+ state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ filters: &[Expr],
+ limit: Option<usize>,
+ ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
+ let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
+ let plan = self.df.logical_plan().clone();
+ let mut plan = LogicalPlanBuilder::from(plan);
+
+ if let Some(filter) = filter {
+ plan = plan.filter(filter)?;
+ }
+
+ let mut plan = if let Some(projection) = projection {
+ // avoiding adding a redundant projection (e.g. SELECT * FROM view)
+ let current_projection =
(0..plan.schema().fields().len()).collect::<Vec<usize>>();
+ if projection == ¤t_projection {
+ plan
+ } else {
+ let fields: Vec<Expr> = projection
+ .iter()
+ .map(|i| {
+ Expr::Column(Column::from(
+
self.df.logical_plan().schema().qualified_field(*i),
+ ))
+ })
+ .collect();
+ plan.project(fields)?
+ }
+ } else {
+ plan
+ };
+
+ if let Some(limit) = limit {
+ plan = plan.limit(0, Some(limit))?;
+ }
+
+ state.create_physical_plan(&plan.build()?).await
+ }
+
+ fn supports_filters_pushdown(
+ &self,
+ filters: &[&Expr],
+ ) -> datafusion::common::Result<Vec<TableProviderFilterPushDown>> {
+ Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]