This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5bdc9afd87 [test] add fuzz test for topk (#7772)
5bdc9afd87 is described below
commit 5bdc9afd872f8523a8c8c2c58072ef482f8edcda
Author: Reilly.tang <[email protected]>
AuthorDate: Sat Oct 21 20:05:51 2023 +0800
[test] add fuzz test for topk (#7772)
Signed-off-by: reilly <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/tests/fuzz_cases/sort_fuzz.rs | 308 ++++++++++++++++++++------
1 file changed, 244 insertions(+), 64 deletions(-)
diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
index d74144b0ab..2615abfd3c 100644
--- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
@@ -22,89 +22,100 @@ use arrow::{
compute::SortOptions,
record_batch::RecordBatch,
};
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
+use arrow_array::{Float64Array, StringArray};
+use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion::{
+ datasource::MemTable,
+ execution::runtime_env::{RuntimeConfig, RuntimeEnv},
+};
+use datafusion_common::{
+ cast::{as_float64_array, as_string_array},
+ TableReference,
+};
use datafusion_execution::memory_pool::GreedyMemoryPool;
-use rand::Rng;
+use datafusion_physical_expr::expressions::col;
+use rand::{rngs::StdRng, Rng, SeedableRng};
use std::sync::Arc;
-use test_utils::{batches_to_vec, partitions_to_sorted_vec};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch};
+const KB: usize = 1 << 10;
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_1k_mem() {
- SortTest::new()
- .with_int32_batches(5)
- .with_pool_size(10240)
- .with_should_spill(false)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(20000)
- .with_pool_size(10240)
- .with_should_spill(true)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(1000000)
- .with_pool_size(10240)
- .with_should_spill(true)
- .run()
- .await;
+ for (batch_size, should_spill) in [(5, false), (20000, true), (1000000,
true)] {
+ SortTest::new()
+ .with_int32_batches(batch_size)
+ .with_pool_size(10 * KB)
+ .with_should_spill(should_spill)
+ .run()
+ .await;
+ }
}
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
- SortTest::new()
- .with_int32_batches(5)
- .with_pool_size(102400)
- .with_should_spill(false)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(20000)
- .with_pool_size(102400)
- .with_should_spill(false)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(1000000)
- .with_pool_size(102400)
- .with_should_spill(true)
- .run()
- .await;
+ for (batch_size, should_spill) in [(5, false), (20000, false), (1000000,
true)] {
+ SortTest::new()
+ .with_int32_batches(batch_size)
+ .with_pool_size(100 * KB)
+ .with_should_spill(should_spill)
+ .run()
+ .await;
+ }
}
#[tokio::test]
async fn test_sort_unlimited_mem() {
- SortTest::new()
- .with_int32_batches(5)
- .with_pool_size(usize::MAX)
- .with_should_spill(false)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(20000)
- .with_pool_size(usize::MAX)
- .with_should_spill(false)
- .run()
- .await;
-
- SortTest::new()
- .with_int32_batches(1000000)
- .with_pool_size(usize::MAX)
- .with_should_spill(false)
- .run()
- .await;
+ for (batch_size, should_spill) in [(5, false), (20000, false), (1000000,
false)] {
+ SortTest::new()
+ .with_int32_batches(batch_size)
+ .with_pool_size(usize::MAX)
+ .with_should_spill(should_spill)
+ .run()
+ .await;
+ }
+}
+
+#[tokio::test]
+async fn test_sort_topk() {
+ for size in [10, 100, 1000, 10000, 1000000] {
+ let mut topk_scenario = TopKScenario::new()
+ .with_limit(10)
+ .with_table_name("t")
+ .with_col_name("x");
+
+ // test topk with i32
+ let collected_i32 = SortTest::new()
+ .with_input(topk_scenario.batches(size, ColType::I32))
+ .run_with_limit(&topk_scenario)
+ .await;
+ let actual = batches_to_vec(&collected_i32);
+ let excepted_i32 = topk_scenario.excepted_i32();
+ assert_eq!(actual, excepted_i32);
+
+ // test topk with f64
+ let collected_f64 = SortTest::new()
+ .with_input(topk_scenario.batches(size, ColType::F64))
+ .run_with_limit(&topk_scenario)
+ .await;
+ let actual: Vec<Option<f64>> = batches_to_f64_vec(&collected_f64);
+ let excepted_f64 = topk_scenario.excepted_f64();
+ assert_eq!(actual, excepted_f64);
+
+ // test topk with str
+ let collected_str = SortTest::new()
+ .with_input(topk_scenario.batches(size, ColType::Str))
+ .run_with_limit(&topk_scenario)
+ .await;
+ let actual: Vec<Option<&str>> = batches_to_str_vec(&collected_str);
+ let excepted_str = topk_scenario.excepted_str();
+ assert_eq!(actual, excepted_str);
+ }
}
#[derive(Debug, Default)]
@@ -121,6 +132,11 @@ impl SortTest {
Default::default()
}
+ fn with_input(mut self, batches: Vec<Vec<RecordBatch>>) -> Self {
+ self.input = batches.clone();
+ self
+ }
+
/// Create batches of int32 values of rows
fn with_int32_batches(mut self, rows: usize) -> Self {
self.input = vec![make_staggered_i32_batches(rows)];
@@ -138,6 +154,44 @@ impl SortTest {
self
}
+ async fn run_with_limit<'a>(
+ &self,
+ topk_scenario: &TopKScenario<'a>,
+ ) -> Vec<RecordBatch> {
+ let input = self.input.clone();
+ let schema = input
+ .iter()
+ .flat_map(|p| p.iter())
+ .next()
+ .expect("at least one batch")
+ .schema();
+
+ let table = MemTable::try_new(schema, input.clone()).unwrap();
+
+ let ctx = SessionContext::new();
+
+ ctx.register_table(
+ TableReference::Bare {
+ table: topk_scenario.table_name.into(),
+ },
+ Arc::new(table),
+ )
+ .unwrap();
+
+ let df = ctx
+ .table(topk_scenario.table_name)
+ .await
+ .unwrap()
+ .sort(vec![
+ datafusion_expr::col(topk_scenario.col_name).sort(true, true)
+ ])
+ .unwrap()
+ .limit(0, Some(topk_scenario.limit))
+ .unwrap();
+
+ df.collect().await.unwrap()
+ }
+
/// Sort the input using SortExec and ensure the results are
/// correct according to `Vec::sort` both with and without spilling
async fn run(&self) {
@@ -208,6 +262,109 @@ impl SortTest {
}
}
+enum ColType {
+ I32,
+ F64,
+ Str,
+}
+
+struct TopKScenario<'a> {
+ limit: usize,
+ batches: Vec<Vec<RecordBatch>>,
+ table_name: &'a str,
+ col_name: &'a str,
+}
+
+impl<'a> TopKScenario<'a> {
+ fn new() -> Self {
+ TopKScenario {
+ limit: 0,
+ batches: vec![],
+ table_name: "",
+ col_name: "",
+ }
+ }
+
+ fn with_limit(mut self, limit: usize) -> Self {
+ self.limit = limit;
+ self
+ }
+
+ fn with_table_name(mut self, table_name: &'a str) -> Self {
+ self.table_name = table_name;
+ self
+ }
+
+ fn with_col_name(mut self, col_name: &'a str) -> Self {
+ self.col_name = col_name;
+ self
+ }
+
+ fn batches(&mut self, len: usize, t: ColType) -> Vec<Vec<RecordBatch>> {
+ let batches = match t {
+ ColType::I32 => make_staggered_i32_batches(len),
+ ColType::F64 => make_staggered_f64_batches(len),
+ ColType::Str => make_staggered_str_batches(len),
+ };
+ self.batches = vec![batches];
+ self.batches.clone()
+ }
+
+ fn excepted_i32(&self) -> Vec<Option<i32>> {
+ let excepted = partitions_to_sorted_vec(&self.batches);
+ excepted[0..self.limit].into()
+ }
+
+ fn excepted_f64(&self) -> Vec<Option<f64>> {
+ let mut excepted: Vec<Option<f64>> = self
+ .batches
+ .iter()
+ .flat_map(|batches| batches_to_f64_vec(batches).into_iter())
+ .collect();
+ excepted.sort_by(|a, b| a.partial_cmp(b).unwrap());
+ excepted[0..self.limit].into()
+ }
+
+ fn excepted_str(&self) -> Vec<Option<&str>> {
+ let mut excepted: Vec<Option<&str>> = self
+ .batches
+ .iter()
+ .flat_map(|batches| batches_to_str_vec(batches).into_iter())
+ .collect();
+ excepted.sort_unstable();
+ excepted[0..self.limit].into()
+ }
+}
+
+impl Default for TopKScenario<'_> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+fn make_staggered_f64_batches(len: usize) -> Vec<RecordBatch> {
+ let mut rng = StdRng::seed_from_u64(100);
+ let remainder = RecordBatch::try_from_iter(vec![(
+ "x",
+ Arc::new(Float64Array::from_iter_values(
+ (0..len).map(|_| rng.gen_range(0.0..1000.7)),
+ )) as ArrayRef,
+ )])
+ .unwrap();
+ stagger_batch(remainder)
+}
+
+fn make_staggered_str_batches(len: usize) -> Vec<RecordBatch> {
+ let remainder = RecordBatch::try_from_iter(vec![(
+ "x",
+ Arc::new(StringArray::from_iter_values(
+ (0..len).map(|_| get_random_string(6)),
+ )) as ArrayRef,
+ )])
+ .unwrap();
+ stagger_batch(remainder)
+}
+
/// Return randomly sized record batches in a field named 'x' of type `Int32`
/// with randomized i32 content
fn make_staggered_i32_batches(len: usize) -> Vec<RecordBatch> {
@@ -232,3 +389,26 @@ fn make_staggered_i32_batches(len: usize) ->
Vec<RecordBatch> {
}
batches
}
+
+/// Return random ASCII String with len
+fn get_random_string(len: usize) -> String {
+ rand::thread_rng()
+ .sample_iter(rand::distributions::Alphanumeric)
+ .take(len)
+ .map(char::from)
+ .collect()
+}
+
+fn batches_to_f64_vec(batches: &[RecordBatch]) -> Vec<Option<f64>> {
+ batches
+ .iter()
+ .flat_map(|batch| as_float64_array(batch.column(0)).unwrap().iter())
+ .collect()
+}
+
+fn batches_to_str_vec(batches: &[RecordBatch]) -> Vec<Option<&str>> {
+ batches
+ .iter()
+ .flat_map(|batch| as_string_array(batch.column(0)).unwrap().iter())
+ .collect()
+}