This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c763fda90d Make `required_guarantees` output to be deterministic
(#12484)
c763fda90d is described below
commit c763fda90db819e4dbf5529b8bfcbe4b00805847
Author: Austin Liu <[email protected]>
AuthorDate: Wed Sep 18 19:19:02 2024 +0800
Make `required_guarantees` output to be deterministic (#12484)
* Make output to be determinstic by adding sort
Signed-off-by: Austin Liu <[email protected]>
Make output to be determinstic by adding sort
Signed-off-by: Austin Liu <[email protected]>
Format
Signed-off-by: Austin Liu <[email protected]>
* Add deterministic test for `required_guarantees`
Signed-off-by: Austin Liu <[email protected]>
---------
Signed-off-by: Austin Liu <[email protected]>
---
.../src/datasource/physical_plan/parquet/mod.rs | 119 ++++++++++++++++++++-
datafusion/physical-expr/src/utils/guarantee.rs | 15 +--
2 files changed, 119 insertions(+), 15 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index f22d02699a..7d3342db5c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -613,14 +613,16 @@ impl DisplayAs for ParquetExec {
.pruning_predicate
.as_ref()
.map(|pre| {
+ let mut guarantees = pre
+ .literal_guarantees()
+ .iter()
+ .map(|item| format!("{}", item))
+ .collect_vec();
+ guarantees.sort();
format!(
", pruning_predicate={}, required_guarantees=[{}]",
pre.predicate_expr(),
- pre.literal_guarantees()
- .iter()
- .map(|item| format!("{}", item))
- .collect_vec()
- .join(", ")
+ guarantees.join(", ")
)
})
.unwrap_or_default();
@@ -1808,6 +1810,26 @@ mod tests {
create_batch(vec![("c1", c1.clone())])
}
+ /// Returns a int64 array with contents:
+ /// "[-1, 1, null, 2, 3, null, null]"
+ fn int64_batch() -> RecordBatch {
+ let contents: ArrayRef = Arc::new(Int64Array::from(vec![
+ Some(-1),
+ Some(1),
+ None,
+ Some(2),
+ Some(3),
+ None,
+ None,
+ ]));
+
+ create_batch(vec![
+ ("a", contents.clone()),
+ ("b", contents.clone()),
+ ("c", contents.clone()),
+ ])
+ }
+
#[tokio::test]
async fn parquet_exec_metrics() {
// batch1: c1(string)
@@ -1873,6 +1895,93 @@ mod tests {
assert_contains!(&display, "projection=[c1]");
}
+ #[tokio::test]
+ async fn parquet_exec_display_deterministic() {
+ // batches: a(int64), b(int64), c(int64)
+ let batches = int64_batch();
+
+ fn extract_required_guarantees(s: &str) -> Option<&str> {
+ s.split("required_guarantees=").nth(1)
+ }
+
+ // Ensuring that the required_guarantees remain consistent across
every display plan of the filter conditions
+ for _ in 0..100 {
+ // c = 1 AND b = 1 AND a = 1
+ let filter0 = col("c")
+ .eq(lit(1))
+ .and(col("b").eq(lit(1)))
+ .and(col("a").eq(lit(1)));
+
+ let rt0 = RoundTrip::new()
+ .with_predicate(filter0)
+ .with_pushdown_predicate()
+ .round_trip(vec![batches.clone()])
+ .await;
+
+ let pruning_predicate = &rt0.parquet_exec.pruning_predicate;
+ assert!(pruning_predicate.is_some());
+
+ let display0 = displayable(rt0.parquet_exec.as_ref())
+ .indent(true)
+ .to_string();
+
+ let guarantees0: &str = extract_required_guarantees(&display0)
+ .expect("Failed to extract required_guarantees");
+ // Compare only the required_guarantees part (Because the
file_groups part will not be the same)
+ assert_eq!(
+ guarantees0.trim(),
+ "[a in (1), b in (1), c in (1)]",
+ "required_guarantees don't match"
+ );
+ }
+
+ // c = 1 AND a = 1 AND b = 1
+ let filter1 = col("c")
+ .eq(lit(1))
+ .and(col("a").eq(lit(1)))
+ .and(col("b").eq(lit(1)));
+
+ let rt1 = RoundTrip::new()
+ .with_predicate(filter1)
+ .with_pushdown_predicate()
+ .round_trip(vec![batches.clone()])
+ .await;
+
+ // b = 1 AND a = 1 AND c = 1
+ let filter2 = col("b")
+ .eq(lit(1))
+ .and(col("a").eq(lit(1)))
+ .and(col("c").eq(lit(1)));
+
+ let rt2 = RoundTrip::new()
+ .with_predicate(filter2)
+ .with_pushdown_predicate()
+ .round_trip(vec![batches])
+ .await;
+
+ // should have a pruning predicate
+ let pruning_predicate = &rt1.parquet_exec.pruning_predicate;
+ assert!(pruning_predicate.is_some());
+ let pruning_predicate = &rt2.parquet_exec.pruning_predicate;
+ assert!(pruning_predicate.is_some());
+
+ // convert to explain plan form
+ let display1 = displayable(rt1.parquet_exec.as_ref())
+ .indent(true)
+ .to_string();
+ let display2 = displayable(rt2.parquet_exec.as_ref())
+ .indent(true)
+ .to_string();
+
+ let guarantees1 = extract_required_guarantees(&display1)
+ .expect("Failed to extract required_guarantees");
+ let guarantees2 = extract_required_guarantees(&display2)
+ .expect("Failed to extract required_guarantees");
+
+ // Compare only the required_guarantees part (Because the predicate
part will not be the same)
+ assert_eq!(guarantees1, guarantees2, "required_guarantees don't
match");
+ }
+
#[tokio::test]
async fn parquet_exec_has_no_pruning_predicate_if_can_not_prune() {
// batch1: c1(string)
diff --git a/datafusion/physical-expr/src/utils/guarantee.rs
b/datafusion/physical-expr/src/utils/guarantee.rs
index 4385066529..cd1597217c 100644
--- a/datafusion/physical-expr/src/utils/guarantee.rs
+++ b/datafusion/physical-expr/src/utils/guarantee.rs
@@ -225,26 +225,21 @@ impl LiteralGuarantee {
impl Display for LiteralGuarantee {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ let mut sorted_literals: Vec<_> =
+ self.literals.iter().map(|lit| lit.to_string()).collect();
+ sorted_literals.sort();
match self.guarantee {
Guarantee::In => write!(
f,
"{} in ({})",
self.column.name,
- self.literals
- .iter()
- .map(|lit| lit.to_string())
- .collect::<Vec<_>>()
- .join(", ")
+ sorted_literals.join(", ")
),
Guarantee::NotIn => write!(
f,
"{} not in ({})",
self.column.name,
- self.literals
- .iter()
- .map(|lit| lit.to_string())
- .collect::<Vec<_>>()
- .join(", ")
+ sorted_literals.join(", ")
),
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]