This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new be42f3d17c Return TableProviderFilterPushDown::Exact when Parquet
Pushdown Enabled (#12135)
be42f3d17c is described below
commit be42f3d17c792cd31d49ce06a9a95d87b76ef94a
Author: June <[email protected]>
AuthorDate: Tue Sep 17 10:23:58 2024 -0600
Return TableProviderFilterPushDown::Exact when Parquet Pushdown Enabled
(#12135)
* feat: Preemptively filter for pushdown-preventing columns in ListingTable
* Fix behavior to make all previous tests work and lay groundwork for
future tests
* fix: Add some more tests and fix small issue with pushdown specificity
* test: Revive unneccesarily removed test
* ci: Fix CI issues with different combinations of exprs
* fix: run fmt
* Fix doc publicity issues
* Add ::new fn for PushdownChecker
* Remove unnecessary 'pub' qualifier
* Fix naming and doc comment of non_pushdown_columns to reflect what it
actually does (the opposite) and add back useful comments
* fmt
* Extend FileFormat trait to allow library users to define formats which
support pushdown
* fmt
* fix: reference real fn in doc to fix CI
* Minor: Add tests for using FilterExec when parquet was pushed down
* Update datafusion/core/src/datasource/file_format/mod.rs
* Pipe schema information through to TableScan and ParquetExec to
facilitate unnecessary FilterExec removal
* - Remove collect::<(_, _)> to satisfy msrv
- Remove expect(_) attr to satisfy msrv
- Update comments with more accurate details and explanations
* Add more details in comments for `map_partial_batch`
Co-authored-by: Andrew Lamb <[email protected]>
* Remove reference to issue #4028 as it will be closed
* Convert normal comments to doc-comments
Co-authored-by: Andrew Lamb <[email protected]>
* Clarify meaning of word `projected` in comment
Co-authored-by: Andrew Lamb <[email protected]>
* Clarify more how `table_schema` is used differently from
`projected_table_schema`
Co-authored-by: Andrew Lamb <[email protected]>
* Finish partially-written comment about SchemaMapping struct
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/datasource/file_format/mod.rs | 28 ++
.../core/src/datasource/file_format/parquet.rs | 28 +-
datafusion/core/src/datasource/listing/helpers.rs | 110 ++++----
datafusion/core/src/datasource/listing/table.rs | 71 +++--
.../datasource/physical_plan/file_scan_config.rs | 2 +-
.../core/src/datasource/physical_plan/mod.rs | 5 +-
.../src/datasource/physical_plan/parquet/mod.rs | 4 +-
.../src/datasource/physical_plan/parquet/opener.rs | 4 +-
.../datasource/physical_plan/parquet/row_filter.rs | 312 ++++++++++++++++-----
datafusion/core/src/datasource/schema_adapter.rs | 147 ++++++++--
datafusion/core/src/physical_planner.rs | 2 +-
datafusion/expr-common/src/signature.rs | 4 +-
datafusion/expr/src/logical_plan/plan.rs | 14 +-
.../optimizer/src/optimize_projections/mod.rs | 12 +-
.../src/optimize_projections/required_indices.rs | 15 +-
datafusion/physical-expr/src/expressions/binary.rs | 4 +-
datafusion/physical-expr/src/expressions/column.rs | 5 +-
datafusion/proto/src/logical_plan/mod.rs | 15 +-
datafusion/proto/src/physical_plan/mod.rs | 116 ++++----
.../join_disable_repartition_joins.slt.temp | 26 ++
.../test_files/parquet_filter_pushdown.slt | 28 +-
datafusion/sqllogictest/test_files/repartition.slt | 2 +-
datafusion/sqllogictest/test_files/select.slt | 2 +-
datafusion/sqllogictest/test_files/string_view.slt | 1 -
datafusion/sqllogictest/test_files/window.slt | 2 +-
25 files changed, 662 insertions(+), 297 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 1dcf480cf4..a503e36adb 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -45,6 +45,7 @@ use crate::physical_plan::{ExecutionPlan, Statistics};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
+use datafusion_expr::Expr;
use datafusion_physical_expr::PhysicalExpr;
use async_trait::async_trait;
@@ -138,6 +139,33 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}
+
+ /// Check if the specified file format has support for pushing down the
provided filters within
+ /// the given schemas. Added initially to support the Parquet file
format's ability to do this.
+ fn supports_filters_pushdown(
+ &self,
+ _file_schema: &Schema,
+ _table_schema: &Schema,
+ _filters: &[&Expr],
+ ) -> Result<FilePushdownSupport> {
+ Ok(FilePushdownSupport::NoSupport)
+ }
+}
+
+/// An enum to distinguish between different states when determining if
certain filters can be
+/// pushed down to file scanning
+#[derive(Debug, PartialEq)]
+pub enum FilePushdownSupport {
+ /// The file format/system being asked does not support any sort of
pushdown. This should be
+ /// used even if the file format theoretically supports some sort of
pushdown, but it's not
+ /// enabled or implemented yet.
+ NoSupport,
+ /// The file format/system being asked *does* support pushdown, but it
can't make it work for
+ /// the provided filter/expression
+ NotSupportedForFilter,
+ /// The file format/system being asked *does* support pushdown and *can*
make it work for the
+ /// provided filter/expression
+ Supported,
}
/// A container of [FileFormatFactory] which also implements [FileType].
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 2a862dd6dc..35296b0d79 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -26,7 +26,7 @@ use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{
coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat,
- FileFormatFactory, FileScanConfig,
+ FileFormatFactory, FilePushdownSupport, FileScanConfig,
};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
@@ -53,6 +53,7 @@ use datafusion_common::{
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool,
MemoryReservation};
use datafusion_execution::TaskContext;
+use datafusion_expr::Expr;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;
@@ -78,7 +79,9 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;
-use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
+use crate::datasource::physical_plan::parquet::{
+ can_expr_be_pushed_down_with_schemas, ParquetExecBuilder,
+};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
@@ -414,6 +417,27 @@ impl FileFormat for ParquetFormat {
order_requirements,
)) as _)
}
+
+ fn supports_filters_pushdown(
+ &self,
+ file_schema: &Schema,
+ table_schema: &Schema,
+ filters: &[&Expr],
+ ) -> Result<FilePushdownSupport> {
+ if !self.options().global.pushdown_filters {
+ return Ok(FilePushdownSupport::NoSupport);
+ }
+
+ let all_supported = filters.iter().all(|filter| {
+ can_expr_be_pushed_down_with_schemas(filter, file_schema,
table_schema)
+ });
+
+ Ok(if all_supported {
+ FilePushdownSupport::Supported
+ } else {
+ FilePushdownSupport::NotSupportedForFilter
+ })
+ }
}
/// Fetches parquet metadata from ObjectStore for given object
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index 33a16237e1..72d7277d6a 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -53,66 +53,64 @@ use object_store::{ObjectMeta, ObjectStore};
/// was performed
pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
let mut is_applicable = true;
- expr.apply(|expr| {
- match expr {
- Expr::Column(Column { ref name, .. }) => {
- is_applicable &= col_names.contains(&name.as_str());
- if is_applicable {
- Ok(TreeNodeRecursion::Jump)
- } else {
- Ok(TreeNodeRecursion::Stop)
- }
+ expr.apply(|expr| match expr {
+ Expr::Column(Column { ref name, .. }) => {
+ is_applicable &= col_names.contains(&name.as_str());
+ if is_applicable {
+ Ok(TreeNodeRecursion::Jump)
+ } else {
+ Ok(TreeNodeRecursion::Stop)
}
- Expr::Literal(_)
- | Expr::Alias(_)
- | Expr::OuterReferenceColumn(_, _)
- | Expr::ScalarVariable(_, _)
- | Expr::Not(_)
- | Expr::IsNotNull(_)
- | Expr::IsNull(_)
- | Expr::IsTrue(_)
- | Expr::IsFalse(_)
- | Expr::IsUnknown(_)
- | Expr::IsNotTrue(_)
- | Expr::IsNotFalse(_)
- | Expr::IsNotUnknown(_)
- | Expr::Negative(_)
- | Expr::Cast { .. }
- | Expr::TryCast { .. }
- | Expr::BinaryExpr { .. }
- | Expr::Between { .. }
- | Expr::Like { .. }
- | Expr::SimilarTo { .. }
- | Expr::InList { .. }
- | Expr::Exists { .. }
- | Expr::InSubquery(_)
- | Expr::ScalarSubquery(_)
- | Expr::GroupingSet(_)
- | Expr::Case { .. } => Ok(TreeNodeRecursion::Continue),
-
- Expr::ScalarFunction(scalar_function) => {
- match scalar_function.func.signature().volatility {
- Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
- // TODO: Stable functions could be `applicable`, but that
would require access to the context
- Volatility::Stable | Volatility::Volatile => {
- is_applicable = false;
- Ok(TreeNodeRecursion::Stop)
- }
+ }
+ Expr::Literal(_)
+ | Expr::Alias(_)
+ | Expr::OuterReferenceColumn(_, _)
+ | Expr::ScalarVariable(_, _)
+ | Expr::Not(_)
+ | Expr::IsNotNull(_)
+ | Expr::IsNull(_)
+ | Expr::IsTrue(_)
+ | Expr::IsFalse(_)
+ | Expr::IsUnknown(_)
+ | Expr::IsNotTrue(_)
+ | Expr::IsNotFalse(_)
+ | Expr::IsNotUnknown(_)
+ | Expr::Negative(_)
+ | Expr::Cast(_)
+ | Expr::TryCast(_)
+ | Expr::BinaryExpr(_)
+ | Expr::Between(_)
+ | Expr::Like(_)
+ | Expr::SimilarTo(_)
+ | Expr::InList(_)
+ | Expr::Exists(_)
+ | Expr::InSubquery(_)
+ | Expr::ScalarSubquery(_)
+ | Expr::GroupingSet(_)
+ | Expr::Case(_) => Ok(TreeNodeRecursion::Continue),
+
+ Expr::ScalarFunction(scalar_function) => {
+ match scalar_function.func.signature().volatility {
+ Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
+ // TODO: Stable functions could be `applicable`, but that
would require access to the context
+ Volatility::Stable | Volatility::Volatile => {
+ is_applicable = false;
+ Ok(TreeNodeRecursion::Stop)
}
}
+ }
- // TODO other expressions are not handled yet:
- // - AGGREGATE and WINDOW should not end up in filter conditions,
except maybe in some edge cases
- // - Can `Wildcard` be considered as a `Literal`?
- // - ScalarVariable could be `applicable`, but that would require
access to the context
- Expr::AggregateFunction { .. }
- | Expr::WindowFunction { .. }
- | Expr::Wildcard { .. }
- | Expr::Unnest { .. }
- | Expr::Placeholder(_) => {
- is_applicable = false;
- Ok(TreeNodeRecursion::Stop)
- }
+ // TODO other expressions are not handled yet:
+ // - AGGREGATE and WINDOW should not end up in filter conditions,
except maybe in some edge cases
+ // - Can `Wildcard` be considered as a `Literal`?
+ // - ScalarVariable could be `applicable`, but that would require
access to the context
+ Expr::AggregateFunction { .. }
+ | Expr::WindowFunction { .. }
+ | Expr::Wildcard { .. }
+ | Expr::Unnest { .. }
+ | Expr::Placeholder(_) => {
+ is_applicable = false;
+ Ok(TreeNodeRecursion::Stop)
}
})
.unwrap();
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index adf907011b..3541a8ff21 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -18,16 +18,17 @@
//! The table implementation.
use std::collections::HashMap;
-use std::str::FromStr;
-use std::{any::Any, sync::Arc};
+use std::{any::Any, str::FromStr, sync::Arc};
use super::helpers::{expr_applicable_for_cols, pruned_partition_list,
split_files};
-use super::PartitionedFile;
+use super::{ListingTableUrl, PartitionedFile};
-use super::ListingTableUrl;
-use crate::datasource::{create_ordering, get_statistics_with_limit};
use crate::datasource::{
- file_format::{file_compression_type::FileCompressionType, FileFormat},
+ create_ordering,
+ file_format::{
+ file_compression_type::FileCompressionType, FileFormat,
FilePushdownSupport,
+ },
+ get_statistics_with_limit,
physical_plan::{FileScanConfig, FileSinkConfig},
};
use crate::execution::context::SessionState;
@@ -43,8 +44,9 @@ use datafusion_common::{
config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
SchemaExt, ToDFSchema,
};
-use datafusion_execution::cache::cache_manager::FileStatisticsCache;
-use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
+use datafusion_execution::cache::{
+ cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
+};
use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortRequirement,
};
@@ -817,19 +819,22 @@ impl TableProvider for ListingTable {
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;
- let filters = if let Some(expr) = conjunction(filters.to_vec()) {
- // NOTE: Use the table schema (NOT file schema) here because
`expr` may contain references to partition columns.
- let table_df_schema =
self.table_schema.as_ref().clone().to_dfschema()?;
- let filters =
- create_physical_expr(&expr, &table_df_schema,
state.execution_props())?;
- Some(filters)
- } else {
- None
- };
+ let filters = conjunction(filters.to_vec())
+ .map(|expr| -> Result<_> {
+ // NOTE: Use the table schema (NOT file schema) here because
`expr` may contain references to partition columns.
+ let table_df_schema =
self.table_schema.as_ref().clone().to_dfschema()?;
+ let filters = create_physical_expr(
+ &expr,
+ &table_df_schema,
+ state.execution_props(),
+ )?;
+ Ok(Some(filters))
+ })
+ .unwrap_or(Ok(None))?;
- let object_store_url = if let Some(url) = self.table_paths.first() {
- url.object_store()
- } else {
+ let Some(object_store_url) =
+ self.table_paths.first().map(ListingTableUrl::object_store)
+ else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};
@@ -854,7 +859,7 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
- Ok(filters
+ filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
@@ -862,19 +867,29 @@ impl TableProvider for ListingTable {
.options
.table_partition_cols
.iter()
- .map(|x| x.0.as_str())
+ .map(|col| col.0.as_str())
.collect::<Vec<_>>(),
filter,
) {
// if filter can be handled by partition pruning, it is
exact
- TableProviderFilterPushDown::Exact
- } else {
- // otherwise, we still might be able to handle the filter
with file
- // level mechanisms such as Parquet row group pruning.
- TableProviderFilterPushDown::Inexact
+ return Ok(TableProviderFilterPushDown::Exact);
+ }
+
+ // if we can't push it down completely with only the
filename-based/path-based
+ // column names, then we should check if we can do parquet
predicate pushdown
+ let supports_pushdown =
self.options.format.supports_filters_pushdown(
+ &self.file_schema,
+ &self.table_schema,
+ &[filter],
+ )?;
+
+ if supports_pushdown == FilePushdownSupport::Supported {
+ return Ok(TableProviderFilterPushDown::Exact);
}
+
+ Ok(TableProviderFilterPushDown::Inexact)
})
- .collect())
+ .collect()
}
fn get_table_definition(&self) -> Option<&str> {
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 9f67418569..2c438e8b0e 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -258,7 +258,7 @@ impl FileScanConfig {
(projected_schema, table_stats, projected_output_ordering)
}
- #[allow(unused)] // Only used by avro
+ #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
pub(crate) fn projected_file_column_names(&self) -> Option<Vec<String>> {
self.projection.as_ref().map(|p| {
p.iter()
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index f810fb86bd..4018b3bb29 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -516,7 +516,8 @@ mod tests {
Field::new("c3", DataType::Float64, true),
]));
- let adapter =
DefaultSchemaAdapterFactory::default().create(table_schema.clone());
+ let adapter = DefaultSchemaAdapterFactory
+ .create(table_schema.clone(), table_schema.clone());
let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
@@ -573,7 +574,7 @@ mod tests {
let indices = vec![1, 2, 4];
let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
- let adapter = DefaultSchemaAdapterFactory::default().create(schema);
+ let adapter = DefaultSchemaAdapterFactory.create(schema,
table_schema.clone());
let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();
let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 54d4d7262a..f22d02699a 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -61,6 +61,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
+pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use writer::plan_to_parquet;
/// Execution plan for reading one or more Parquet files.
@@ -405,6 +406,7 @@ impl ParquetExecBuilder {
let (projected_schema, projected_statistics,
projected_output_ordering) =
base_config.project();
+
let cache = ParquetExec::compute_properties(
projected_schema,
&projected_output_ordering,
@@ -707,7 +709,7 @@ impl ExecutionPlan for ParquetExec {
let schema_adapter_factory = self
.schema_adapter_factory
.clone()
- .unwrap_or_else(||
Arc::new(DefaultSchemaAdapterFactory::default()));
+ .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));
let opener = ParquetOpener {
partition_index,
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index 2a198c3d45..9880c30ddb 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -99,7 +99,9 @@ impl FileOpener for ParquetOpener {
let projected_schema =
SchemaRef::from(self.table_schema.project(&self.projection)?);
- let schema_adapter =
self.schema_adapter_factory.create(projected_schema);
+ let schema_adapter = self
+ .schema_adapter_factory
+ .create(projected_schema, self.table_schema.clone());
let predicate = self.predicate.clone();
let pruning_predicate = self.pruning_predicate.clone();
let page_pruning_predicate = self.page_pruning_predicate.clone();
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
index 59d23fd68c..d3bc8030cf 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
@@ -76,7 +76,7 @@ use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
TreeNodeRewriter,
};
-use datafusion_common::{arrow_err, DataFusionError, Result, ScalarValue};
+use datafusion_common::{arrow_datafusion_err, DataFusionError, Result,
ScalarValue};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::utils::reassign_predicate_columns;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
@@ -237,12 +237,6 @@ struct FilterCandidateBuilder<'a> {
/// The schema of the table (merged schema) -- columns may be in different
/// order than in the file and have columns that are not in the file schema
table_schema: &'a Schema,
- required_column_indices: BTreeSet<usize>,
- /// Does the expression require any non-primitive columns (like structs)?
- non_primitive_columns: bool,
- /// Does the expression reference any columns that are in the table
- /// schema but not in the file schema?
- projected_columns: bool,
}
impl<'a> FilterCandidateBuilder<'a> {
@@ -255,9 +249,6 @@ impl<'a> FilterCandidateBuilder<'a> {
expr,
file_schema,
table_schema,
- required_column_indices: BTreeSet::default(),
- non_primitive_columns: false,
- projected_columns: false,
}
}
@@ -268,53 +259,87 @@ impl<'a> FilterCandidateBuilder<'a> {
/// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
- pub fn build(
- mut self,
- metadata: &ParquetMetaData,
- ) -> Result<Option<FilterCandidate>> {
- let expr = self.expr.clone().rewrite(&mut self).data()?;
-
- if self.non_primitive_columns || self.projected_columns {
- Ok(None)
- } else {
- let required_bytes =
- size_of_columns(&self.required_column_indices, metadata)?;
- let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
-
- Ok(Some(FilterCandidate {
- expr,
- required_bytes,
- can_use_index,
- projection: self.required_column_indices.into_iter().collect(),
- }))
+ pub fn build(self, metadata: &ParquetMetaData) ->
Result<Option<FilterCandidate>> {
+ let Some((required_indices, rewritten_expr)) =
+ pushdown_columns(self.expr, self.file_schema, self.table_schema)?
+ else {
+ return Ok(None);
+ };
+
+ let required_bytes = size_of_columns(&required_indices, metadata)?;
+ let can_use_index = columns_sorted(&required_indices, metadata)?;
+
+ Ok(Some(FilterCandidate {
+ expr: rewritten_expr,
+ required_bytes,
+ can_use_index,
+ projection: required_indices.into_iter().collect(),
+ }))
+ }
+}
+
+// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree
structure to determine
+// if any column references in the expression would prevent it from being
predicate-pushed-down.
+// if non_primitive_columns || projected_columns, it can't be pushed down.
+// can't be reused between calls to `rewrite`; each construction must be used
only once.
+struct PushdownChecker<'schema> {
+ /// Does the expression require any non-primitive columns (like structs)?
+ non_primitive_columns: bool,
+ /// Does the expression reference any columns that are in the table
+ /// schema but not in the file schema?
+ projected_columns: bool,
+ // the indices of all the columns found within the given expression which
exist inside the given
+ // [`file_schema`]
+ required_column_indices: BTreeSet<usize>,
+ file_schema: &'schema Schema,
+ table_schema: &'schema Schema,
+}
+
+impl<'schema> PushdownChecker<'schema> {
+ fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) ->
Self {
+ Self {
+ non_primitive_columns: false,
+ projected_columns: false,
+ required_column_indices: BTreeSet::default(),
+ file_schema,
+ table_schema,
}
}
+
+ fn check_single_column(&mut self, column_name: &str) ->
Option<TreeNodeRecursion> {
+ if let Ok(idx) = self.file_schema.index_of(column_name) {
+ self.required_column_indices.insert(idx);
+
+ if DataType::is_nested(self.file_schema.field(idx).data_type()) {
+ self.non_primitive_columns = true;
+ return Some(TreeNodeRecursion::Jump);
+ }
+ } else if self.table_schema.index_of(column_name).is_err() {
+ // If the column does not exist in the (un-projected) table schema
then
+ // it must be a projected column.
+ self.projected_columns = true;
+ return Some(TreeNodeRecursion::Jump);
+ }
+
+ None
+ }
+
+ #[inline]
+ fn prevents_pushdown(&self) -> bool {
+ self.non_primitive_columns || self.projected_columns
+ }
}
-/// Implement the `TreeNodeRewriter` trait for `FilterCandidateBuilder` that
-/// walks the expression tree and rewrites it in preparation of becoming
-/// `FilterCandidate`.
-impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
+impl<'schema> TreeNodeRewriter for PushdownChecker<'schema> {
type Node = Arc<dyn PhysicalExpr>;
- /// Called before visiting each child
fn f_down(
&mut self,
node: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if let Some(column) = node.as_any().downcast_ref::<Column>() {
- if let Ok(idx) = self.file_schema.index_of(column.name()) {
- self.required_column_indices.insert(idx);
-
- if
DataType::is_nested(self.file_schema.field(idx).data_type()) {
- self.non_primitive_columns = true;
- return Ok(Transformed::new(node, false,
TreeNodeRecursion::Jump));
- }
- } else if self.table_schema.index_of(column.name()).is_err() {
- // If the column does not exist in the (un-projected) table
schema then
- // it must be a projected column.
- self.projected_columns = true;
- return Ok(Transformed::new(node, false,
TreeNodeRecursion::Jump));
+ if let Some(recursion) = self.check_single_column(column.name()) {
+ return Ok(Transformed::new(node, false, recursion));
}
}
@@ -322,29 +347,30 @@ impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
}
/// After visiting all children, rewrite column references to nulls if
- /// they are not in the file schema
+ /// they are not in the file schema.
+ /// We do this because they won't be relevant if they're not in the file
schema, since that's
+ /// the only thing we're dealing with here as this is only used for the
parquet pushdown during
+ /// scanning
fn f_up(
&mut self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
- // if the expression is a column, is it in the file schema?
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
+ // if the expression is a column, is it in the file schema?
if self.file_schema.field_with_name(column.name()).is_err() {
- // Replace the column reference with a NULL (using the type
from the table schema)
- // e.g. `column = 'foo'` is rewritten be transformed to `NULL
= 'foo'`
- //
- // See comments on `FilterCandidateBuilder` for more
information
- return match self.table_schema.field_with_name(column.name()) {
- Ok(field) => {
- // return the null value corresponding to the data type
+ return self
+ .table_schema
+ .field_with_name(column.name())
+ .and_then(|field| {
+ // Replace the column reference with a NULL (using the
type from the table schema)
+ // e.g. `column = 'foo'` is rewritten be transformed
to `NULL = 'foo'`
+ //
+ // See comments on `FilterCandidateBuilder` for more
information
let null_value =
ScalarValue::try_from(field.data_type())?;
-
Ok(Transformed::yes(Arc::new(Literal::new(null_value))))
- }
- Err(e) => {
- // If the column is not in the table schema, should
throw the error
- arrow_err!(e)
- }
- };
+ Ok(Transformed::yes(Arc::new(Literal::new(null_value))
as _))
+ })
+ // If the column is not in the table schema, should throw
the error
+ .map_err(|e| arrow_datafusion_err!(e));
}
}
@@ -352,6 +378,69 @@ impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
}
}
+type ProjectionAndExpr = (BTreeSet<usize>, Arc<dyn PhysicalExpr>);
+
+// Checks if a given expression can be pushed down into `ParquetExec` as
opposed to being evaluated
+// post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns
returns all the
+// columns in the given expression so that they can be used in the parquet
scanning, along with the
+// expression rewritten as defined in [`PushdownChecker::f_up`]
+fn pushdown_columns(
+ expr: Arc<dyn PhysicalExpr>,
+ file_schema: &Schema,
+ table_schema: &Schema,
+) -> Result<Option<ProjectionAndExpr>> {
+ let mut checker = PushdownChecker::new(file_schema, table_schema);
+
+ let expr = expr.rewrite(&mut checker).data()?;
+
+
Ok((!checker.prevents_pushdown()).then_some((checker.required_column_indices,
expr)))
+}
+
+/// creates a PushdownChecker for a single use to check a given column with
the given schemes. Used
+/// to check preemptively if a column name would prevent pushdowning.
+/// effectively does the inverse of [`pushdown_columns`] does, but with a
single given column
+/// (instead of traversing the entire tree to determine this)
+fn would_column_prevent_pushdown(
+ column_name: &str,
+ file_schema: &Schema,
+ table_schema: &Schema,
+) -> bool {
+ let mut checker = PushdownChecker::new(file_schema, table_schema);
+
+ // the return of this is only used for [`PushdownChecker::f_down()`], so
we can safely ignore
+ // it here. I'm just verifying we know the return type of this so nobody
accidentally changes
+ // the return type of this fn and it gets implicitly ignored here.
+ let _: Option<TreeNodeRecursion> =
checker.check_single_column(column_name);
+
+ // and then return a value based on the state of the checker
+ checker.prevents_pushdown()
+}
+
+/// Recurses through expr as a trea, finds all `column`s, and checks if any of
them would prevent
+/// this expression from being predicate pushed down. If any of them would,
this returns false.
+/// Otherwise, true.
+pub fn can_expr_be_pushed_down_with_schemas(
+ expr: &datafusion_expr::Expr,
+ file_schema: &Schema,
+ table_schema: &Schema,
+) -> bool {
+ let mut can_be_pushed = true;
+ expr.apply(|expr| match expr {
+ datafusion_expr::Expr::Column(column) => {
+ can_be_pushed &=
+ !would_column_prevent_pushdown(column.name(), file_schema,
table_schema);
+ Ok(if can_be_pushed {
+ TreeNodeRecursion::Jump
+ } else {
+ TreeNodeRecursion::Stop
+ })
+ }
+ _ => Ok(TreeNodeRecursion::Continue),
+ })
+ .unwrap(); // we never return an Err, so we can safely unwrap this
+ can_be_pushed
+}
+
/// Computes the projection required to go from the file's schema order to the
projected
/// order expected by this filter
///
@@ -444,11 +533,13 @@ pub fn build_row_filter(
// Determine which conjuncts can be evaluated as ArrowPredicates, if any
let mut candidates: Vec<FilterCandidate> = predicates
.into_iter()
- .flat_map(|expr| {
+ .map(|expr| {
FilterCandidateBuilder::new(expr.clone(), file_schema,
table_schema)
.build(metadata)
- .unwrap_or_default()
})
+ .collect::<Result<Vec<_>, _>>()?
+ .into_iter()
+ .flatten()
.collect();
// no candidates
@@ -485,11 +576,12 @@ pub fn build_row_filter(
#[cfg(test)]
mod test {
use super::*;
- use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory;
- use crate::datasource::schema_adapter::SchemaAdapterFactory;
+ use crate::datasource::schema_adapter::{
+ DefaultSchemaAdapterFactory, SchemaAdapterFactory,
+ };
use arrow::datatypes::Field;
- use arrow_schema::TimeUnit::Nanosecond;
+ use arrow_schema::{Fields, TimeUnit::Nanosecond};
use datafusion_expr::{cast, col, lit, Expr};
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_plan::metrics::{Count, Time};
@@ -583,8 +675,9 @@ mod test {
false,
)]);
+ let table_ref = Arc::new(table_schema.clone());
let schema_adapter =
- DefaultSchemaAdapterFactory
{}.create(Arc::new(table_schema.clone()));
+ DefaultSchemaAdapterFactory.create(Arc::clone(&table_ref),
table_ref);
let (schema_mapping, _) = schema_adapter
.map_schema(&file_schema)
.expect("creating schema mapping");
@@ -661,4 +754,87 @@ mod test {
assert_eq!(projection, remapped)
}
}
+
+ #[test]
+ fn nested_data_structures_prevent_pushdown() {
+ let table_schema = get_basic_table_schema();
+
+ let file_schema = Schema::new(vec![Field::new(
+ "list_col",
+ DataType::Struct(Fields::empty()),
+ true,
+ )]);
+
+ let expr = col("list_col").is_not_null();
+
+ assert!(!can_expr_be_pushed_down_with_schemas(
+ &expr,
+ &file_schema,
+ &table_schema
+ ));
+ }
+
+ #[test]
+ fn projected_columns_prevent_pushdown() {
+ let table_schema = get_basic_table_schema();
+
+ let file_schema =
+ Schema::new(vec![Field::new("existing_col", DataType::Int64,
true)]);
+
+ let expr = col("nonexistent_column").is_null();
+
+ assert!(!can_expr_be_pushed_down_with_schemas(
+ &expr,
+ &file_schema,
+ &table_schema
+ ));
+ }
+
+ #[test]
+ fn basic_expr_doesnt_prevent_pushdown() {
+ let table_schema = get_basic_table_schema();
+
+ let file_schema = Schema::new(vec![Field::new("str_col",
DataType::Utf8, true)]);
+
+ let expr = col("str_col").is_null();
+
+ assert!(can_expr_be_pushed_down_with_schemas(
+ &expr,
+ &file_schema,
+ &table_schema
+ ));
+ }
+
+ #[test]
+ fn complex_expr_doesnt_prevent_pushdown() {
+ let table_schema = get_basic_table_schema();
+
+ let file_schema = Schema::new(vec![
+ Field::new("str_col", DataType::Utf8, true),
+ Field::new("int_col", DataType::UInt64, true),
+ ]);
+
+ let expr = col("str_col")
+ .is_not_null()
+
.or(col("int_col").gt(Expr::Literal(ScalarValue::UInt64(Some(5)))));
+
+ assert!(can_expr_be_pushed_down_with_schemas(
+ &expr,
+ &file_schema,
+ &table_schema
+ ));
+ }
+
+ fn get_basic_table_schema() -> Schema {
+ let testdata = crate::test_util::parquet_test_data();
+ let file =
std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
+ .expect("opening file");
+
+ let reader = SerializedFileReader::new(file).expect("creating reader");
+
+ let metadata = reader.metadata();
+
+ parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
+ .expect("parsing schema")
+ }
}
diff --git a/datafusion/core/src/datasource/schema_adapter.rs
b/datafusion/core/src/datasource/schema_adapter.rs
index de508f2c34..fdf3381758 100644
--- a/datafusion/core/src/datasource/schema_adapter.rs
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -35,7 +35,13 @@ use std::sync::Arc;
/// other than null)
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
/// Provides `SchemaAdapter`.
- fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
+ // The design of this function is mostly modeled for the needs of
DefaultSchemaAdapterFactory,
+ // read its implementation docs for the reasoning
+ fn create(
+ &self,
+ projected_table_schema: SchemaRef,
+ table_schema: SchemaRef,
+ ) -> Box<dyn SchemaAdapter>;
}
/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a
schema
@@ -96,17 +102,33 @@ pub trait SchemaMapper: Debug + Send + Sync {
/// Implementation of [`SchemaAdapterFactory`] that maps columns by name
/// and casts columns to the expected type.
#[derive(Clone, Debug, Default)]
-pub struct DefaultSchemaAdapterFactory {}
+pub struct DefaultSchemaAdapterFactory;
impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
- fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
- Box::new(DefaultSchemaAdapter { table_schema })
+ fn create(
+ &self,
+ projected_table_schema: SchemaRef,
+ table_schema: SchemaRef,
+ ) -> Box<dyn SchemaAdapter> {
+ Box::new(DefaultSchemaAdapter {
+ projected_table_schema,
+ table_schema,
+ })
}
}
+/// This SchemaAdapter requires both the table schema and the projected table
schema because of the
+/// needs of the [`SchemaMapping`] it creates. Read its documentation for more
details
#[derive(Clone, Debug)]
pub(crate) struct DefaultSchemaAdapter {
- /// Schema for the table
+ /// The schema for the table, projected to include only the fields being
output (projected) by the
+ /// associated ParquetExec
+ projected_table_schema: SchemaRef,
+ /// The entire table schema for the table we're using this to adapt.
+ ///
+ /// This is used to evaluate any filters pushed down into the scan
+ /// which may refer to columns that are not referred to anywhere
+ /// else in the plan.
table_schema: SchemaRef,
}
@@ -116,7 +138,7 @@ impl SchemaAdapter for DefaultSchemaAdapter {
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
- let field = self.table_schema.field(index);
+ let field = self.projected_table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}
@@ -133,11 +155,11 @@ impl SchemaAdapter for DefaultSchemaAdapter {
file_schema: &Schema,
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());
- let mut field_mappings = vec![None; self.table_schema.fields().len()];
+ let mut field_mappings = vec![None;
self.projected_table_schema.fields().len()];
for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, table_field)) =
- self.table_schema.fields().find(file_field.name())
+ self.projected_table_schema.fields().find(file_field.name())
{
match can_cast_types(file_field.data_type(),
table_field.data_type()) {
true => {
@@ -158,8 +180,9 @@ impl SchemaAdapter for DefaultSchemaAdapter {
Ok((
Arc::new(SchemaMapping {
- table_schema: self.table_schema.clone(),
+ projected_table_schema: self.projected_table_schema.clone(),
field_mappings,
+ table_schema: self.table_schema.clone(),
}),
projection,
))
@@ -168,39 +191,81 @@ impl SchemaAdapter for DefaultSchemaAdapter {
/// The SchemaMapping struct holds a mapping from the file schema to the table
schema
/// and any necessary type conversions that need to be applied.
+///
+/// This needs both the projected table schema and full table schema because
its different
+/// functions have different needs. The [`map_batch`] function is only used by
the ParquetOpener to
+/// produce a RecordBatch which has the projected schema, since that's the
schema which is supposed
+/// to come out of the execution of this query. [`map_partial_batch`],
however, is used to create a
+/// RecordBatch with a schema that can be used for Parquet pushdown, meaning
that it may contain
+/// fields which are not in the projected schema (as the fields that parquet
pushdown filters
+/// operate can be completely distinct from the fields that are projected
(output) out of the
+/// ParquetExec).
+///
+/// [`map_partial_batch`] uses `table_schema` to create the resulting
RecordBatch (as it could be
+/// operating on any fields in the schema), while [`map_batch`] uses
`projected_table_schema` (as
+/// it can only operate on the projected fields).
+///
+/// [`map_batch`]: Self::map_batch
+/// [`map_partial_batch`]: Self::map_partial_batch
#[derive(Debug)]
pub struct SchemaMapping {
- /// The schema of the table. This is the expected schema after conversion
and it should match the schema of the query result.
- table_schema: SchemaRef,
- /// Mapping from field index in `table_schema` to index in projected
file_schema
+ /// The schema of the table. This is the expected schema after conversion
and it should match
+ /// the schema of the query result.
+ projected_table_schema: SchemaRef,
+ /// Mapping from field index in `projected_table_schema` to index in
projected file_schema.
+ /// They are Options instead of just plain `usize`s because the table
could have fields that
+ /// don't exist in the file.
field_mappings: Vec<Option<usize>>,
+ /// The entire table schema, as opposed to the projected_table_schema
(which only contains the
+ /// columns that we are projecting out of this query). This contains all
fields in the table,
+ /// regardless of if they will be projected out or not.
+ table_schema: SchemaRef,
}
impl SchemaMapper for SchemaMapping {
- /// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
+ /// Adapts a `RecordBatch` to match the `projected_table_schema` using the
stored mapping and
+ /// conversions. The produced RecordBatch has a schema that contains only
the projected
+ /// columns, so if one needs a RecordBatch with a schema that references
columns which are not
+ /// in the projected, it would be better to use `map_partial_batch`
fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();
let cols = self
- .table_schema
+ .projected_table_schema
+ // go through each field in the projected schema
.fields()
.iter()
+ // and zip it with the index that maps fields from the projected
table schema to the
+ // projected file schema in `batch`
.zip(&self.field_mappings)
- .map(|(field, file_idx)| match file_idx {
- Some(batch_idx) => cast(&batch_cols[*batch_idx],
field.data_type()),
- None => Ok(new_null_array(field.data_type(), batch_rows)),
+ // and for each one...
+ .map(|(field, file_idx)| {
+ file_idx.map_or_else(
+ // If this field only exists in the table, and not in the
file, then we know
+ // that it's null, so just return that.
+ || Ok(new_null_array(field.data_type(), batch_rows)),
+ // However, if it does exist in both, then try to cast it
to the correct output
+ // type
+ |batch_idx| cast(&batch_cols[batch_idx],
field.data_type()),
+ )
})
.collect::<datafusion_common::Result<Vec<_>, _>>()?;
// Necessary to handle empty batches
let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
- let schema = self.table_schema.clone();
+ let schema = self.projected_table_schema.clone();
let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
Ok(record_batch)
}
+ /// Adapts a [`RecordBatch`]'s schema into one that has all the correct
output types and only
+ /// contains the fields that exist in both the file schema and table
schema.
+ ///
+ /// Unlike `map_batch` this method also preserves the columns that
+ /// may not appear in the final output (`projected_table_schema`) but may
+ /// appear in push down predicates
fn map_partial_batch(
&self,
batch: RecordBatch,
@@ -208,15 +273,33 @@ impl SchemaMapper for SchemaMapping {
let batch_cols = batch.columns().to_vec();
let schema = batch.schema();
- let mut cols = vec![];
- let mut fields = vec![];
- for (i, f) in schema.fields().iter().enumerate() {
- let table_field = self.table_schema.field_with_name(f.name());
- if let Ok(tf) = table_field {
- cols.push(cast(&batch_cols[i], tf.data_type())?);
- fields.push(tf.clone());
- }
- }
+ // for each field in the batch's schema (which is based on a file, not
a table)...
+ let (cols, fields) = schema
+ .fields()
+ .iter()
+ .zip(batch_cols.iter())
+ .flat_map(|(field, batch_col)| {
+ self.table_schema
+ // try to get the same field from the table schema that we
have stored in self
+ .field_with_name(field.name())
+ // and if we don't have it, that's fine, ignore it. This
may occur when we've
+ // created an external table whose fields are a subset of
the fields in this
+ // file, then tried to read data from the file into this
table. If that is the
+ // case here, it's fine to ignore because we don't care
about this field
+ // anyways
+ .ok()
+ // but if we do have it,
+ .map(|table_field| {
+ // try to cast it into the correct output type. we
don't want to ignore this
+ // error, though, so it's propagated.
+ cast(batch_col, table_field.data_type())
+ // and if that works, return the field and column.
+ .map(|new_col| (new_col, table_field.clone()))
+ })
+ })
+ .collect::<Result<Vec<_>, _>>()?
+ .into_iter()
+ .unzip::<_, _, Vec<_>, Vec<_>>();
// Necessary to handle empty batches
let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
@@ -322,12 +405,16 @@ mod tests {
}
#[derive(Debug)]
- struct TestSchemaAdapterFactory {}
+ struct TestSchemaAdapterFactory;
impl SchemaAdapterFactory for TestSchemaAdapterFactory {
- fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+ fn create(
+ &self,
+ projected_table_schema: SchemaRef,
+ _table_schema: SchemaRef,
+ ) -> Box<dyn SchemaAdapter> {
Box::new(TestSchemaAdapter {
- table_schema: schema,
+ table_schema: projected_table_schema,
})
}
}
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index cc35255dfe..2010a5c664 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -429,7 +429,7 @@ impl DefaultPhysicalPlanner {
Ok(Some(plan))
}
- /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan
counterpart.
+ /// Given a single LogicalPlan node, map it to its physical ExecutionPlan
counterpart.
async fn map_logical_node_to_physical(
&self,
node: &LogicalPlan,
diff --git a/datafusion/expr-common/src/signature.rs
b/datafusion/expr-common/src/signature.rs
index ffa5f17cec..d1553b3315 100644
--- a/datafusion/expr-common/src/signature.rs
+++ b/datafusion/expr-common/src/signature.rs
@@ -43,8 +43,8 @@ pub enum Volatility {
Immutable,
/// A stable function may return different values given the same input
across different
/// queries but must return the same value for a given input within a
query. An example of
- /// this is the `Now` function. DataFusion
- /// will attempt to inline `Stable` functions during planning, when
possible.
+ /// this is the `Now` function. DataFusion will attempt to inline `Stable`
functions
+ /// during planning, when possible.
/// For query `select col1, now() from t1`, it might take a while to
execute but
/// `now()` column will be the same for each output row, which is evaluated
/// during planning.
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 1c94c7f3af..b3f9b26fa4 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1980,7 +1980,7 @@ impl LogicalPlan {
.map(|i| &input_columns[*i])
.collect::<Vec<&Column>>();
// get items from input_columns indexed by
list_col_indices
- write!(f, "Unnest: lists[{}] structs[{}]",
+ write!(f, "Unnest: lists[{}] structs[{}]",
expr_vec_fmt!(list_type_columns),
expr_vec_fmt!(struct_type_columns))
}
@@ -2124,11 +2124,13 @@ impl Projection {
/// the `Result` will contain the schema; otherwise, it will contain an error.
pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) ->
Result<Arc<DFSchema>> {
let metadata = input.schema().metadata().clone();
- let mut schema =
- DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?,
metadata)?;
- schema =
schema.with_functional_dependencies(calc_func_dependencies_for_project(
- exprs, input,
- )?)?;
+
+ let schema =
+ DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?,
metadata)?
+ .with_functional_dependencies(calc_func_dependencies_for_project(
+ exprs, input,
+ )?)?;
+
Ok(Arc::new(schema))
}
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 0623be504b..65db164c6e 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -176,7 +176,7 @@ fn optimize_projections(
let all_exprs_iter =
new_group_bys.iter().chain(new_aggr_expr.iter());
let schema = aggregate.input.schema();
let necessary_indices =
- RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?;
+ RequiredIndicies::new().with_exprs(schema, all_exprs_iter);
let necessary_exprs = necessary_indices.get_required_exprs(schema);
return optimize_projections(
@@ -216,8 +216,7 @@ fn optimize_projections(
// Get all the required column indices at the input, either by the
// parent or window expression requirements.
- let required_indices =
- child_reqs.with_exprs(&input_schema, &new_window_expr)?;
+ let required_indices = child_reqs.with_exprs(&input_schema,
&new_window_expr);
return optimize_projections(
Arc::unwrap_or_clone(window.input),
@@ -269,7 +268,6 @@ fn optimize_projections(
.map(LogicalPlan::TableScan)
.map(Transformed::yes);
}
-
// Other node types are handled below
_ => {}
};
@@ -761,7 +759,7 @@ fn rewrite_projection_given_requirements(
let exprs_used = indices.get_at_indices(&expr);
let required_indices =
- RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter())?;
+ RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter());
// rewrite the children projection, and if they are changed rewrite the
// projection down
@@ -781,8 +779,8 @@ fn rewrite_projection_given_requirements(
/// - input schema of the projection, output schema of the projection are
same, and
/// - all projection expressions are either Column or Literal
fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) ->
Result<bool> {
- Ok(&projection_schema(input, proj_exprs)? == input.schema()
- && proj_exprs.iter().all(is_expr_trivial))
+ let proj_schema = projection_schema(input, proj_exprs)?;
+ Ok(&proj_schema == input.schema() &&
proj_exprs.iter().all(is_expr_trivial))
}
#[cfg(test)]
diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs
b/datafusion/optimizer/src/optimize_projections/required_indices.rs
index a9a18898c8..60d8ef1a8e 100644
--- a/datafusion/optimizer/src/optimize_projections/required_indices.rs
+++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs
@@ -96,7 +96,7 @@ impl RequiredIndicies {
// Add indices of the child fields referred to by the expressions in
the
// parent
plan.apply_expressions(|e| {
- self.add_expr(schema, e)?;
+ self.add_expr(schema, e);
Ok(TreeNodeRecursion::Continue)
})?;
Ok(self.compact())
@@ -111,7 +111,7 @@ impl RequiredIndicies {
///
/// * `input_schema`: The input schema to analyze for index requirements.
/// * `expr`: An expression for which we want to find necessary field
indices.
- fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) ->
Result<()> {
+ fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) {
// TODO could remove these clones (and visit the expression directly)
let mut cols = expr.column_refs();
// Get outer-referenced (subquery) columns:
@@ -122,7 +122,6 @@ impl RequiredIndicies {
self.indices.push(idx);
}
}
- Ok(())
}
/// Adds the indices of the fields referred to by the given expressions
@@ -136,14 +135,14 @@ impl RequiredIndicies {
self,
schema: &DFSchemaRef,
exprs: impl IntoIterator<Item = &'a Expr>,
- ) -> Result<Self> {
+ ) -> Self {
exprs
.into_iter()
- .try_fold(self, |mut acc, expr| {
- acc.add_expr(schema, expr)?;
- Ok(acc)
+ .fold(self, |mut acc, expr| {
+ acc.add_expr(schema, expr);
+ acc
})
- .map(|acc| acc.compact())
+ .compact()
}
/// Adds all `indices` into this instance.
diff --git a/datafusion/physical-expr/src/expressions/binary.rs
b/datafusion/physical-expr/src/expressions/binary.rs
index e115ec3c74..236b24dd40 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -213,8 +213,8 @@ macro_rules! compute_utf8_flag_op_scalar {
.downcast_ref::<$ARRAYTYPE>()
.expect("compute_utf8_flag_op_scalar failed to downcast array");
- if let
ScalarValue::Utf8(Some(string_value))|ScalarValue::LargeUtf8(Some(string_value))
= $RIGHT {
- let flag = if $FLAG { Some("i") } else { None };
+ if let ScalarValue::Utf8(Some(string_value)) |
ScalarValue::LargeUtf8(Some(string_value)) = $RIGHT {
+ let flag = $FLAG.then_some("i");
let mut array =
paste::expr! {[<$OP _utf8_scalar>]}(&ll, &string_value, flag)?;
if $NOT {
diff --git a/datafusion/physical-expr/src/expressions/column.rs
b/datafusion/physical-expr/src/expressions/column.rs
index bf15821bca..4aad959584 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -163,7 +163,10 @@ impl Column {
internal_err!(
"PhysicalExpr Column references column '{}' at index {}
(zero-based) but input schema only has {} columns: {:?}",
self.name,
- self.index, input_schema.fields.len(),
input_schema.fields().iter().map(|f| f.name().clone()).collect::<Vec<String>>())
+ self.index,
+ input_schema.fields.len(),
+ input_schema.fields().iter().map(|f|
f.name()).collect::<Vec<_>>()
+ )
}
}
}
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index bf5394ec01..db94563b7a 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -362,13 +362,18 @@ impl AsLogicalPlan for LogicalPlanNode {
"logical_plan::from_proto() Unsupported file
format '{self:?}'"
))
})? {
- #[cfg(feature = "parquet")]
+ #[cfg_attr(not(feature = "parquet"),
allow(unused_variables))]
FileFormatType::Parquet(protobuf::ParquetFormat
{options}) => {
- let mut parquet = ParquetFormat::default();
- if let Some(options) = options {
- parquet =
parquet.with_options(options.try_into()?)
+ #[cfg(feature = "parquet")]
+ {
+ let mut parquet = ParquetFormat::default();
+ if let Some(options) = options {
+ parquet =
parquet.with_options(options.try_into()?)
+ }
+ Arc::new(parquet)
}
- Arc::new(parquet)
+ #[cfg(not(feature = "parquet"))]
+ panic!("Unable to process parquet file since
`parquet` feature is not enabled");
}
FileFormatType::Csv(protobuf::CsvFormat {
options
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index e1cc37091b..6abfc71288 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -234,30 +234,35 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)),
- #[cfg(feature = "parquet")]
+ #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
PhysicalPlanType::ParquetScan(scan) => {
- let base_config = parse_protobuf_file_scan_config(
- scan.base_conf.as_ref().unwrap(),
- registry,
- extension_codec,
- )?;
- let predicate = scan
- .predicate
- .as_ref()
- .map(|expr| {
- parse_physical_expr(
- expr,
- registry,
- base_config.file_schema.as_ref(),
- extension_codec,
- )
- })
- .transpose()?;
- let mut builder = ParquetExec::builder(base_config);
- if let Some(predicate) = predicate {
- builder = builder.with_predicate(predicate)
+ #[cfg(feature = "parquet")]
+ {
+ let base_config = parse_protobuf_file_scan_config(
+ scan.base_conf.as_ref().unwrap(),
+ registry,
+ extension_codec,
+ )?;
+ let predicate = scan
+ .predicate
+ .as_ref()
+ .map(|expr| {
+ parse_physical_expr(
+ expr,
+ registry,
+ base_config.file_schema.as_ref(),
+ extension_codec,
+ )
+ })
+ .transpose()?;
+ let mut builder = ParquetExec::builder(base_config);
+ if let Some(predicate) = predicate {
+ builder = builder.with_predicate(predicate)
+ }
+ Ok(builder.build_arc())
}
- Ok(builder.build_arc())
+ #[cfg(not(feature = "parquet"))]
+ panic!("Unable to process a Parquet PhysicalPlan when
`parquet` feature is not enabled")
}
PhysicalPlanType::AvroScan(scan) => {
Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config(
@@ -1068,35 +1073,45 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
sort_order,
)))
}
+ #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
PhysicalPlanType::ParquetSink(sink) => {
- let input =
- into_physical_plan(&sink.input, registry, runtime,
extension_codec)?;
-
- let data_sink: ParquetSink = sink
- .sink
- .as_ref()
- .ok_or_else(|| proto_error("Missing required field in
protobuf"))?
- .try_into()?;
- let sink_schema = input.schema();
- let sort_order = sink
- .sort_order
- .as_ref()
- .map(|collection| {
- parse_physical_sort_exprs(
- &collection.physical_sort_expr_nodes,
- registry,
- &sink_schema,
- extension_codec,
- )
- .map(|item|
PhysicalSortRequirement::from_sort_exprs(&item))
- })
- .transpose()?;
- Ok(Arc::new(DataSinkExec::new(
- input,
- Arc::new(data_sink),
- sink_schema,
- sort_order,
- )))
+ #[cfg(feature = "parquet")]
+ {
+ let input = into_physical_plan(
+ &sink.input,
+ registry,
+ runtime,
+ extension_codec,
+ )?;
+
+ let data_sink: ParquetSink = sink
+ .sink
+ .as_ref()
+ .ok_or_else(|| proto_error("Missing required field in
protobuf"))?
+ .try_into()?;
+ let sink_schema = input.schema();
+ let sort_order = sink
+ .sort_order
+ .as_ref()
+ .map(|collection| {
+ parse_physical_sort_exprs(
+ &collection.physical_sort_expr_nodes,
+ registry,
+ &sink_schema,
+ extension_codec,
+ )
+ .map(|item|
PhysicalSortRequirement::from_sort_exprs(&item))
+ })
+ .transpose()?;
+ Ok(Arc::new(DataSinkExec::new(
+ input,
+ Arc::new(data_sink),
+ sink_schema,
+ sort_order,
+ )))
+ }
+ #[cfg(not(feature = "parquet"))]
+ panic!("Trying to use ParquetSink without `parquet` feature
enabled");
}
PhysicalPlanType::Unnest(unnest) => {
let input = into_physical_plan(
@@ -1954,6 +1969,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
});
}
+ #[cfg(feature = "parquet")]
if let Some(sink) =
exec.sink().as_any().downcast_ref::<ParquetSink>() {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type:
Some(PhysicalPlanType::ParquetSink(Box::new(
diff --git
a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt.temp
b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt.temp
new file mode 100644
index 0000000000..00e74a207b
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt.temp
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+## Join Tests
+##########
+
+# turn off repartition_joins
+statement ok
+set datafusion.optimizer.repartition_joins = false;
+
+include ./join.slt
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index d69662f75d..24ffb963bb 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -81,21 +81,15 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
----
logical_plan
01)Sort: t_pushdown.a ASC NULLS LAST
-02)--Projection: t_pushdown.a
-03)----Filter: t_pushdown.b > Int32(2)
-04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.b > Int32(2)]
+02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b >
Int32(2)]
physical_plan
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----CoalesceBatchesExec: target_batch_size=8192
-04)------FilterExec: b@1 > 2, projection=[a@0]
-05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
-06)----------ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a, b], predicate=b@1 > 2, pruning_predicate=CASE WHEN
b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END,
required_guarantees=[]
+03)----ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], predicate=b@1 > 2, pruning_predicate=CASE WHEN b_null_count@1
= b_row_count@2 THEN false ELSE b_max@0 > 2 END, required_guarantees=[]
# When filter pushdown *is* enabled, ParquetExec can filter exactly,
# not just metadata, so we expect to see no FilterExec
-# once https://github.com/apache/datafusion/issues/4028 is fixed
query T
select a from t_pushdown where b > 2 ORDER BY a;
----
@@ -133,16 +127,11 @@ EXPLAIN select a from t_pushdown where b > 2 AND a IS NOT
NULL order by a;
----
logical_plan
01)Sort: t_pushdown.a ASC NULLS LAST
-02)--Projection: t_pushdown.a
-03)----Filter: t_pushdown.b > Int32(2) AND t_pushdown.a IS NOT NULL
-04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.b > Int32(2), t_pushdown.a IS NOT NULL]
+02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b >
Int32(2), t_pushdown.a IS NOT NULL]
physical_plan
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----CoalesceBatchesExec: target_batch_size=8192
-04)------FilterExec: b@1 > 2 AND a@0 IS NOT NULL, projection=[a@0]
-05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
-06)----------ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a, b], predicate=b@1 > 2 AND a@0 IS NOT NULL,
pruning_predicate=CASE WHEN b_null_count@1 = b_row_count@2 THEN false ELSE
b_max@0 > 2 END AND a_null_count@4 != a_row_count@3, required_guarantees=[]
+03)----ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=CASE
WHEN b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END AND
a_null_count@4 != a_row_count@3, required_guarantees=[]
query I
@@ -155,16 +144,11 @@ EXPLAIN select b from t_pushdown where a = 'bar' order by
b;
----
logical_plan
01)Sort: t_pushdown.b ASC NULLS LAST
-02)--Projection: t_pushdown.b
-03)----Filter: t_pushdown.a = Utf8("bar")
-04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.a = Utf8("bar")]
+02)--TableScan: t_pushdown projection=[b], full_filters=[t_pushdown.a =
Utf8("bar")]
physical_plan
01)SortPreservingMergeExec: [b@0 ASC NULLS LAST]
02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----CoalesceBatchesExec: target_batch_size=8192
-04)------FilterExec: a@0 = bar, projection=[b@1]
-05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
-06)----------ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a, b], predicate=a@0 = bar, pruning_predicate=CASE WHEN
a_null_count@2 = a_row_count@3 THEN false ELSE a_min@0 <= bar AND bar <=
a_max@1 END, required_guarantees=[a in (bar)]
+03)----ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[b], predicate=a@0 = bar, pruning_predicate=CASE WHEN
a_null_count@2 = a_row_count@3 THEN false ELSE a_min@0 <= bar AND bar <=
a_max@1 END, required_guarantees=[a in (bar)]
## cleanup
statement ok
diff --git a/datafusion/sqllogictest/test_files/repartition.slt
b/datafusion/sqllogictest/test_files/repartition.slt
index 2d59ad2b5b..630674bb09 100644
--- a/datafusion/sqllogictest/test_files/repartition.slt
+++ b/datafusion/sqllogictest/test_files/repartition.slt
@@ -146,4 +146,4 @@ FROM t1 WHERE ((false > (v1 = v1)) IS DISTINCT FROM true);
statement ok
DROP TABLE t1;
-# End repartition on empty columns test
\ No newline at end of file
+# End repartition on empty columns test
diff --git a/datafusion/sqllogictest/test_files/select.slt
b/datafusion/sqllogictest/test_files/select.slt
index bdd8deff18..05de3e0b80 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1755,4 +1755,4 @@ SELECT "test.a" FROM (SELECT a AS "test.a" FROM test)
1
statement ok
-DROP TABLE test;
\ No newline at end of file
+DROP TABLE test;
diff --git a/datafusion/sqllogictest/test_files/string_view.slt
b/datafusion/sqllogictest/test_files/string_view.slt
index 7df43bb7ed..c3b5fa8fc4 100644
--- a/datafusion/sqllogictest/test_files/string_view.slt
+++ b/datafusion/sqllogictest/test_files/string_view.slt
@@ -1638,4 +1638,3 @@ select column2|| ' ' ||column3 from temp;
----
rust fast
datafusion cool
-
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 505c66aef0..1f90b94aee 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -4872,4 +4872,4 @@ query error DataFusion error: Execution error: Expected a
signed integer literal
SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
statement ok
-DROP TABLE t1;
\ No newline at end of file
+DROP TABLE t1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]