alamb commented on code in PR #8107:
URL: https://github.com/apache/arrow-datafusion/pull/8107#discussion_r1388311964
##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -42,64 +42,168 @@ use indexmap::IndexMap;
pub type EquivalenceClass = Vec<Arc<dyn PhysicalExpr>>;
/// Stores the mapping between source expressions and target expressions for a
-/// projection.
+/// projection, and the output (post-projection) schema of the table.
#[derive(Debug, Clone)]
pub struct ProjectionMapping {
- /// `(source expression)` --> `(target expression)`
- /// Indices in the vector corresponds to the indices after projection.
- inner: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,
+ /// Mapping between source expressions and target expressions.
+ /// Vector indices correspond to the indices after projection.
+ map: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,
+ /// Output (post-projection) schema.
+ output_schema: SchemaRef,
}
impl ProjectionMapping {
/// Constructs the mapping between a projection's input and output
/// expressions.
///
- /// For example, given the input projection expressions (`a+b`, `c+d`)
- /// and an output schema with two columns `"c+d"` and `"a+b"`
- /// the projection mapping would be
+ /// For example, given the input projection expressions (`a + b`, `c + d`)
+ /// and an output schema with two columns `"c + d"` and `"a + b"`, the
+ /// projection mapping would be:
+ ///
/// ```text
- /// [0]: (c+d, col("c+d"))
- /// [1]: (a+b, col("a+b"))
+ /// [0]: (c + d, col("c + d"))
+ /// [1]: (a + b, col("a + b"))
/// ```
- /// where `col("c+d")` means the column named "c+d".
+ ///
+ /// where `col("c + d")` means the column named `"c + d"`.
pub fn try_new(
expr: &[(Arc<dyn PhysicalExpr>, String)],
input_schema: &SchemaRef,
) -> Result<Self> {
// Construct a map from the input expressions to the output expression
of the projection:
- let mut inner = vec![];
- for (expr_idx, (expression, name)) in expr.iter().enumerate() {
- let target_expr = Arc::new(Column::new(name, expr_idx)) as _;
-
- let source_expr = expression.clone().transform_down(&|e| match e
- .as_any()
- .downcast_ref::<Column>(
- ) {
- Some(col) => {
- // Sometimes, expression and its name in the input_schema
doesn't match.
- // This can cause problems. Hence in here we make sure
that expression name
- // matches with the name in the inout_schema.
- // Conceptually, source_expr and expression should be same.
- let idx = col.index();
- let matching_input_field = input_schema.field(idx);
- let matching_input_column =
- Column::new(matching_input_field.name(), idx);
- Ok(Transformed::Yes(Arc::new(matching_input_column)))
- }
- None => Ok(Transformed::No(e)),
- })?;
+ let map = expr
+ .iter()
+ .enumerate()
+ .map(|(expr_idx, (expression, name))| {
+ let target_expr = Arc::new(Column::new(name, expr_idx)) as _;
+ expression
+ .clone()
+ .transform_down(&|e| match
e.as_any().downcast_ref::<Column>() {
+ Some(col) => {
+ // Sometimes, an expression and its name in the
input_schema
+ // doesn't match. This can cause problems, so we
make sure
+ // that the expression name matches with the name
in `input_schema`.
+ // Conceptually, `source_expr` and `expression`
should be the same.
+ let idx = col.index();
+ let matching_input_field = input_schema.field(idx);
+ let matching_input_column =
+ Column::new(matching_input_field.name(), idx);
+
Ok(Transformed::Yes(Arc::new(matching_input_column)))
+ }
+ None => Ok(Transformed::No(e)),
+ })
+ .map(|source_expr| (source_expr, target_expr))
+ })
+ .collect::<Result<Vec<_>>>()?;
- inner.push((source_expr, target_expr));
- }
- Ok(Self { inner })
+ // Calculate output schema:
+ let fields = expr
+ .iter()
+ .map(|(e, name)| {
+ Ok(Field::new(
+ name,
+ e.data_type(input_schema)?,
+ e.nullable(input_schema)?,
+ )
+ .with_metadata(get_field_metadata(e,
input_schema).unwrap_or_default()))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let output_schema = Arc::new(Schema::new_with_metadata(
+ fields,
+ input_schema.metadata().clone(),
+ ));
+
+ Ok(Self { map, output_schema })
}
/// Iterate over pairs of (source, target) expressions
pub fn iter(
&self,
) -> impl Iterator<Item = &(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>
+ '_ {
- self.inner.iter()
+ self.map.iter()
+ }
+
+ /// Returns a reference to the output (post-projection) schema.
+ pub fn output_schema(&self) -> SchemaRef {
+ self.output_schema.clone()
+ }
+
+ /// This function returns the target expression for a given source
expression.
+ ///
+ /// # Arguments
+ ///
+ /// * `expr` - Source physical expression.
+ ///
+ /// # Returns
+ ///
+ /// An `Option` containing the target expression for the given source.
+ /// `None` means that source is not found inside the mapping.
+ pub fn target_expr(
+ &self,
+ expr: &Arc<dyn PhysicalExpr>,
+ ) -> Option<Arc<dyn PhysicalExpr>> {
+ self.map
+ .iter()
+ .find(|(source, _)| source.eq(expr))
+ .map(|(_, target)| target.clone())
+ }
+
+ /// This function returns the target expressions for all given source
+ /// expressions.
+ ///
+ /// # Arguments
+ ///
+ /// * `exprs` - Source physical expressions.
+ ///
+ /// # Returns
+ ///
+ /// An `Option` containing the target expressions for all the sources.
+ /// If any of the given sources is absent in the mapping, returns `None`.
+ pub fn target_exprs(
+ &self,
+ exprs: &[Arc<dyn PhysicalExpr>],
+ ) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
+ exprs.iter().map(|expr| self.target_expr(expr)).collect()
}
+
+ /// This function projects the given ordering requirement according to this
+ /// mapping.
+ ///
+ /// # Arguments
+ ///
+ /// * `lex_req` - Lexicographical ordering requirement.
+ ///
+ /// # Returns
+ ///
+ /// An `Option` containing the projected lexicorgraphical ordering
requirement.
Review Comment:
```suggestion
/// An `Option` containing the projected lexicographical ordering
requirement.
```
##########
datafusion/sqllogictest/test_files/groupby.slt:
##########
@@ -3846,3 +3846,48 @@ ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT
t1.x), MAX(alias1)@2 as MAX(
------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS
Float64)t1.x@0 as alias1], aggr=[]
--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS
Float64)t1.x, y@1 as y]
----------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+# create an unbounded table that contains ordered timestamp.
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE csv_with_timestamps (
+ name VARCHAR,
+ ts TIMESTAMP
+)
+STORED AS CSV
+WITH ORDER (ts DESC)
+LOCATION '../core/tests/data/timestamps.csv'
+
+# below query should work in streaming mode.
+query TT
+EXPLAIN SELECT date_bin('15 minutes', ts) as time_chunks
+ FROM csv_with_timestamps
+ GROUP BY (date_bin('15 minutes', ts))
+ ORDER BY time_chunks DESC
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: time_chunks DESC NULLS FIRST, fetch=5
+----Projection: date_bin(Utf8("15 minutes"),csv_with_timestamps.ts) AS
time_chunks
+------Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("900000000000"),
csv_with_timestamps.ts) AS date_bin(Utf8("15
minutes"),csv_with_timestamps.ts)]], aggr=[[]]
+--------TableScan: csv_with_timestamps projection=[ts]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--SortPreservingMergeExec: [time_chunks@0 DESC], fetch=5
+----ProjectionExec: expr=[date_bin(Utf8("15
minutes"),csv_with_timestamps.ts)@0 as time_chunks]
+------AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("15
minutes"),csv_with_timestamps.ts)@0 as date_bin(Utf8("15
minutes"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted
+--------CoalesceBatchesExec: target_batch_size=2
+----------SortPreservingRepartitionExec: partitioning=Hash([date_bin(Utf8("15
minutes"),csv_with_timestamps.ts)@0], 8), input_partitions=8,
sort_exprs=date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 DESC
+------------AggregateExec: mode=Partial, gby=[date_bin(900000000000, ts@0) as
date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)], aggr=[],
ordering_mode=Sorted
+--------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+----------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts],
infinite_source=true, output_ordering=[ts@0 DESC], has_header=false
+
+query P
+SELECT date_bin('15 minutes', ts) as time_chunks
+ FROM csv_with_timestamps
+ GROUP BY (date_bin('15 minutes', ts))
Review Comment:
Is there any reason to add extra parenthesis?
Why is this
```
GROUP BY (date_bin('15 minutes', ts))
```
and not
```
GROUP BY date_bin('15 minutes', ts)
```
##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -869,9 +986,27 @@ impl EquivalenceProperties {
self.ordering_satisfy_requirement(&sort_requirements)
}
- /// Checks whether the given sort requirements are satisfied by any of the
- /// existing orderings.
+ /// Checks whether the given sort requirements are satisfied by any of the
existing orderings.
+ /// This function applies an implicit projection to itself before calling
`ordering_satisfy_requirement_helper`
+ /// to define the orderings of complex [`PhysicalExpr`]'s during analysis.
pub fn ordering_satisfy_requirement(&self, reqs: LexRequirementRef) ->
bool {
+ let exprs = reqs
+ .iter()
+ .map(|sort_req| sort_req.expr.clone())
+ .collect::<Vec<_>>();
+ let projection_mapping = self.implicit_projection_mapping(&exprs);
+ let projected_eqs =
+ self.project(&projection_mapping,
projection_mapping.output_schema());
+ if let Some(projected_reqs) =
projection_mapping.project_lex_reqs(reqs) {
Review Comment:
I am not quite sure I follow this logic -- does it say that any expression
that can be projected maintains the ordering?
What about non monotonic expressions like `abs(x)` ? If the input is
orderered by x
```
-2
-1
0
1
2
```
The output will not be ordered by `abs(xx)`
```
2
1
0
1
2
```
(the same applies to date functions like `extract(day from ts)`
Perhaps we need to check
[FuncMonotonicity](https://docs.rs/datafusion/latest/datafusion/physical_expr/functions/type.FuncMonotonicity.html#)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]