This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bdc9afd87 [test] add fuzz test for topk (#7772)
5bdc9afd87 is described below

commit 5bdc9afd872f8523a8c8c2c58072ef482f8edcda
Author: Reilly.tang <[email protected]>
AuthorDate: Sat Oct 21 20:05:51 2023 +0800

    [test] add fuzz test for topk (#7772)
    
    Signed-off-by: reilly <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/tests/fuzz_cases/sort_fuzz.rs | 308 ++++++++++++++++++++------
 1 file changed, 244 insertions(+), 64 deletions(-)

diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
index d74144b0ab..2615abfd3c 100644
--- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
@@ -22,89 +22,100 @@ use arrow::{
     compute::SortOptions,
     record_batch::RecordBatch,
 };
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
+use arrow_array::{Float64Array, StringArray};
+use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion::{
+    datasource::MemTable,
+    execution::runtime_env::{RuntimeConfig, RuntimeEnv},
+};
+use datafusion_common::{
+    cast::{as_float64_array, as_string_array},
+    TableReference,
+};
 use datafusion_execution::memory_pool::GreedyMemoryPool;
-use rand::Rng;
+use datafusion_physical_expr::expressions::col;
+use rand::{rngs::StdRng, Rng, SeedableRng};
 use std::sync::Arc;
-use test_utils::{batches_to_vec, partitions_to_sorted_vec};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch};
 
+const KB: usize = 1 << 10;
 #[tokio::test]
 #[cfg_attr(tarpaulin, ignore)]
 async fn test_sort_1k_mem() {
-    SortTest::new()
-        .with_int32_batches(5)
-        .with_pool_size(10240)
-        .with_should_spill(false)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(20000)
-        .with_pool_size(10240)
-        .with_should_spill(true)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(1000000)
-        .with_pool_size(10240)
-        .with_should_spill(true)
-        .run()
-        .await;
+    for (batch_size, should_spill) in [(5, false), (20000, true), (1000000, 
true)] {
+        SortTest::new()
+            .with_int32_batches(batch_size)
+            .with_pool_size(10 * KB)
+            .with_should_spill(should_spill)
+            .run()
+            .await;
+    }
 }
 
 #[tokio::test]
 #[cfg_attr(tarpaulin, ignore)]
 async fn test_sort_100k_mem() {
-    SortTest::new()
-        .with_int32_batches(5)
-        .with_pool_size(102400)
-        .with_should_spill(false)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(20000)
-        .with_pool_size(102400)
-        .with_should_spill(false)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(1000000)
-        .with_pool_size(102400)
-        .with_should_spill(true)
-        .run()
-        .await;
+    for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, 
true)] {
+        SortTest::new()
+            .with_int32_batches(batch_size)
+            .with_pool_size(100 * KB)
+            .with_should_spill(should_spill)
+            .run()
+            .await;
+    }
 }
 
 #[tokio::test]
 async fn test_sort_unlimited_mem() {
-    SortTest::new()
-        .with_int32_batches(5)
-        .with_pool_size(usize::MAX)
-        .with_should_spill(false)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(20000)
-        .with_pool_size(usize::MAX)
-        .with_should_spill(false)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(1000000)
-        .with_pool_size(usize::MAX)
-        .with_should_spill(false)
-        .run()
-        .await;
+    for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, 
false)] {
+        SortTest::new()
+            .with_int32_batches(batch_size)
+            .with_pool_size(usize::MAX)
+            .with_should_spill(should_spill)
+            .run()
+            .await;
+    }
+}
+
+#[tokio::test]
+async fn test_sort_topk() {
+    for size in [10, 100, 1000, 10000, 1000000] {
+        let mut topk_scenario = TopKScenario::new()
+            .with_limit(10)
+            .with_table_name("t")
+            .with_col_name("x");
+
+        // test topk with i32
+        let collected_i32 = SortTest::new()
+            .with_input(topk_scenario.batches(size, ColType::I32))
+            .run_with_limit(&topk_scenario)
+            .await;
+        let actual = batches_to_vec(&collected_i32);
+        let excepted_i32 = topk_scenario.excepted_i32();
+        assert_eq!(actual, excepted_i32);
+
+        // test topk with f64
+        let collected_f64 = SortTest::new()
+            .with_input(topk_scenario.batches(size, ColType::F64))
+            .run_with_limit(&topk_scenario)
+            .await;
+        let actual: Vec<Option<f64>> = batches_to_f64_vec(&collected_f64);
+        let excepted_f64 = topk_scenario.excepted_f64();
+        assert_eq!(actual, excepted_f64);
+
+        // test topk with str
+        let collected_str = SortTest::new()
+            .with_input(topk_scenario.batches(size, ColType::Str))
+            .run_with_limit(&topk_scenario)
+            .await;
+        let actual: Vec<Option<&str>> = batches_to_str_vec(&collected_str);
+        let excepted_str = topk_scenario.excepted_str();
+        assert_eq!(actual, excepted_str);
+    }
 }
 
 #[derive(Debug, Default)]
@@ -121,6 +132,11 @@ impl SortTest {
         Default::default()
     }
 
+    fn with_input(mut self, batches: Vec<Vec<RecordBatch>>) -> Self {
+        self.input = batches.clone();
+        self
+    }
+
     /// Create batches of int32 values of rows
     fn with_int32_batches(mut self, rows: usize) -> Self {
         self.input = vec![make_staggered_i32_batches(rows)];
@@ -138,6 +154,44 @@ impl SortTest {
         self
     }
 
+    async fn run_with_limit<'a>(
+        &self,
+        topk_scenario: &TopKScenario<'a>,
+    ) -> Vec<RecordBatch> {
+        let input = self.input.clone();
+        let schema = input
+            .iter()
+            .flat_map(|p| p.iter())
+            .next()
+            .expect("at least one batch")
+            .schema();
+
+        let table = MemTable::try_new(schema, input.clone()).unwrap();
+
+        let ctx = SessionContext::new();
+
+        ctx.register_table(
+            TableReference::Bare {
+                table: topk_scenario.table_name.into(),
+            },
+            Arc::new(table),
+        )
+        .unwrap();
+
+        let df = ctx
+            .table(topk_scenario.table_name)
+            .await
+            .unwrap()
+            .sort(vec![
+                datafusion_expr::col(topk_scenario.col_name).sort(true, true)
+            ])
+            .unwrap()
+            .limit(0, Some(topk_scenario.limit))
+            .unwrap();
+
+        df.collect().await.unwrap()
+    }
+
     /// Sort the input using SortExec and ensure the results are
     /// correct according to `Vec::sort` both with and without spilling
     async fn run(&self) {
@@ -208,6 +262,109 @@ impl SortTest {
     }
 }
 
+enum ColType {
+    I32,
+    F64,
+    Str,
+}
+
+struct TopKScenario<'a> {
+    limit: usize,
+    batches: Vec<Vec<RecordBatch>>,
+    table_name: &'a str,
+    col_name: &'a str,
+}
+
+impl<'a> TopKScenario<'a> {
+    fn new() -> Self {
+        TopKScenario {
+            limit: 0,
+            batches: vec![],
+            table_name: "",
+            col_name: "",
+        }
+    }
+
+    fn with_limit(mut self, limit: usize) -> Self {
+        self.limit = limit;
+        self
+    }
+
+    fn with_table_name(mut self, table_name: &'a str) -> Self {
+        self.table_name = table_name;
+        self
+    }
+
+    fn with_col_name(mut self, col_name: &'a str) -> Self {
+        self.col_name = col_name;
+        self
+    }
+
+    fn batches(&mut self, len: usize, t: ColType) -> Vec<Vec<RecordBatch>> {
+        let batches = match t {
+            ColType::I32 => make_staggered_i32_batches(len),
+            ColType::F64 => make_staggered_f64_batches(len),
+            ColType::Str => make_staggered_str_batches(len),
+        };
+        self.batches = vec![batches];
+        self.batches.clone()
+    }
+
+    fn excepted_i32(&self) -> Vec<Option<i32>> {
+        let excepted = partitions_to_sorted_vec(&self.batches);
+        excepted[0..self.limit].into()
+    }
+
+    fn excepted_f64(&self) -> Vec<Option<f64>> {
+        let mut excepted: Vec<Option<f64>> = self
+            .batches
+            .iter()
+            .flat_map(|batches| batches_to_f64_vec(batches).into_iter())
+            .collect();
+        excepted.sort_by(|a, b| a.partial_cmp(b).unwrap());
+        excepted[0..self.limit].into()
+    }
+
+    fn excepted_str(&self) -> Vec<Option<&str>> {
+        let mut excepted: Vec<Option<&str>> = self
+            .batches
+            .iter()
+            .flat_map(|batches| batches_to_str_vec(batches).into_iter())
+            .collect();
+        excepted.sort_unstable();
+        excepted[0..self.limit].into()
+    }
+}
+
+impl Default for TopKScenario<'_> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+fn make_staggered_f64_batches(len: usize) -> Vec<RecordBatch> {
+    let mut rng = StdRng::seed_from_u64(100);
+    let remainder = RecordBatch::try_from_iter(vec![(
+        "x",
+        Arc::new(Float64Array::from_iter_values(
+            (0..len).map(|_| rng.gen_range(0.0..1000.7)),
+        )) as ArrayRef,
+    )])
+    .unwrap();
+    stagger_batch(remainder)
+}
+
+fn make_staggered_str_batches(len: usize) -> Vec<RecordBatch> {
+    let remainder = RecordBatch::try_from_iter(vec![(
+        "x",
+        Arc::new(StringArray::from_iter_values(
+            (0..len).map(|_| get_random_string(6)),
+        )) as ArrayRef,
+    )])
+    .unwrap();
+    stagger_batch(remainder)
+}
+
 /// Return randomly sized record batches in a field named 'x' of type `Int32`
 /// with randomized i32 content
 fn make_staggered_i32_batches(len: usize) -> Vec<RecordBatch> {
@@ -232,3 +389,26 @@ fn make_staggered_i32_batches(len: usize) -> 
Vec<RecordBatch> {
     }
     batches
 }
+
+/// Return random ASCII String with len
+fn get_random_string(len: usize) -> String {
+    rand::thread_rng()
+        .sample_iter(rand::distributions::Alphanumeric)
+        .take(len)
+        .map(char::from)
+        .collect()
+}
+
+fn batches_to_f64_vec(batches: &[RecordBatch]) -> Vec<Option<f64>> {
+    batches
+        .iter()
+        .flat_map(|batch| as_float64_array(batch.column(0)).unwrap().iter())
+        .collect()
+}
+
+fn batches_to_str_vec(batches: &[RecordBatch]) -> Vec<Option<&str>> {
+    batches
+        .iter()
+        .flat_map(|batch| as_string_array(batch.column(0)).unwrap().iter())
+        .collect()
+}

Reply via email to