This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d7c6a7f fix: scan with predicate (#1617)
5d7c6a7f is described below

commit 5d7c6a7ff651a84e08efdb70ce8df496c9b86dcb
Author: 鲍金日 <[email protected]>
AuthorDate: Mon Dec 23 17:23:57 2024 +0800

    fix: scan with predicate (#1617)
    
    ## Rationale
    - support scan with predicate
    
    ## Detailed Changes
    
    
    ## Test Plan
    CI
---
 Cargo.lock                                    |  4 +--
 src/metric_engine/src/compaction/scheduler.rs |  1 -
 src/metric_engine/src/read.rs                 | 40 +++++++++++++++++++--------
 src/metric_engine/src/storage.rs              | 27 ++++++++++++++++++
 4 files changed, 57 insertions(+), 15 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 8cace345..85001036 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2637,7 +2637,7 @@ checksum = 
"0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
 dependencies = [
  "bytes",
  "heck",
- "itertools 0.13.0",
+ "itertools 0.10.5",
  "log",
  "multimap",
  "once_cell",
@@ -2657,7 +2657,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5"
 dependencies = [
  "anyhow",
- "itertools 0.13.0",
+ "itertools 0.10.5",
  "proc-macro2",
  "quote",
  "syn",
diff --git a/src/metric_engine/src/compaction/scheduler.rs 
b/src/metric_engine/src/compaction/scheduler.rs
index d947096e..581d397b 100644
--- a/src/metric_engine/src/compaction/scheduler.rs
+++ b/src/metric_engine/src/compaction/scheduler.rs
@@ -64,7 +64,6 @@ impl Scheduler {
         let task_handle = {
             let store = store.clone();
             let manifest = manifest.clone();
-            let trigger_tx = trigger_tx.clone();
             let executor = Executor::new(
                 runtime.clone(),
                 store,
diff --git a/src/metric_engine/src/read.rs b/src/metric_engine/src/read.rs
index 1dea5c6c..0d274337 100644
--- a/src/metric_engine/src/read.rs
+++ b/src/metric_engine/src/read.rs
@@ -42,8 +42,9 @@ use datafusion::{
     parquet::arrow::async_reader::AsyncFileReader,
     physical_expr::{create_physical_expr, LexOrdering},
     physical_plan::{
-        metrics::ExecutionPlanMetricsSet, 
sorts::sort_preserving_merge::SortPreservingMergeExec,
-        DisplayAs, Distribution, ExecutionPlan, PlanProperties,
+        filter::FilterExec, metrics::ExecutionPlanMetricsSet,
+        sorts::sort_preserving_merge::SortPreservingMergeExec, DisplayAs, 
Distribution,
+        ExecutionPlan, PlanProperties,
     },
     physical_planner::create_physical_sort_exprs,
     prelude::{ident, Expr},
@@ -430,17 +431,28 @@ impl ParquetReader {
         let mut builder = 
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
             Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
         );
-        if let Some(expr) = conjunction(predicates) {
-            let filters = create_physical_expr(&expr, &df_schema, 
&ExecutionProps::new())
-                .context("create physical expr")?;
-            builder = builder.with_predicate(filters);
-        }
+        let base_plan: Arc<dyn ExecutionPlan> = match conjunction(predicates) {
+            Some(expr) => {
+                let filters = create_physical_expr(&expr, &df_schema, 
&ExecutionProps::new())
+                    .context("create physical expr")?;
+
+                builder = builder.with_predicate(filters.clone());
+                let parquet_exec = builder.build();
+
+                let filter_exec = FilterExec::try_new(filters, 
Arc::new(parquet_exec))
+                    .context("create filter exec")?;
+                Arc::new(filter_exec)
+            }
+            None => {
+                let parquet_exec = builder.build();
+                Arc::new(parquet_exec)
+            }
+        };
 
         // TODO: fetch using multiple threads since read from parquet will 
incur CPU
         // when convert between arrow and parquet.
-        let parquet_exec = builder.build();
-        let sort_exec = SortPreservingMergeExec::new(sort_exprs, 
Arc::new(parquet_exec))
-            .with_round_robin_repartition(true);
+        let sort_exec =
+            SortPreservingMergeExec::new(sort_exprs, 
base_plan).with_round_robin_repartition(true);
 
         let merge_exec = MergeExec::new(
             Arc::new(sort_exec),
@@ -459,6 +471,7 @@ impl ParquetReader {
 
 #[cfg(test)]
 mod tests {
+    use datafusion::logical_expr::{col, lit};
     use object_store::local::LocalFileSystem;
     use test_log::test;
 
@@ -542,6 +555,8 @@ mod tests {
             },
             Arc::new(SstPathGenerator::new("mock".to_string())),
         );
+
+        let expr = col("pk1").eq(lit(0_u8));
         let plan = reader
             .build_df_plan(
                 (100..103)
@@ -558,7 +573,7 @@ mod tests {
                     })
                     .collect(),
                 None,
-                vec![],
+                vec![expr],
             )
             .unwrap();
         let display_plan =
@@ -567,7 +582,8 @@ mod tests {
         assert_eq!(
             r#"MergeExec: [primary_keys: 1, seq_idx: 2]
   SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC]
-    ParquetExec: file_groups={3 groups: [[mock/data/100.sst], 
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], 
output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], 
[pk1@0 ASC, __seq__@2 ASC]]
+    FilterExec: pk1@0 = 0
+      ParquetExec: file_groups={3 groups: [[mock/data/100.sst], 
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], 
output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], 
[pk1@0 ASC, __seq__@2 ASC]], predicate=pk1@0 = 0, pruning_predicate=CASE WHEN 
pk1_null_count@2 = pk1_row_count@3 THEN false ELSE pk1_min@0 <= 0 AND 0 <= 
pk1_max@1 END, required_guarantees=[pk1 in (0)]
 "#,
             format!("{display_plan}")
         );
diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs
index b86ad803..2d5b3a6b 100644
--- a/src/metric_engine/src/storage.rs
+++ b/src/metric_engine/src/storage.rs
@@ -404,6 +404,7 @@ impl TimeMergeStorage for CloudObjectStorage {
 
 #[cfg(test)]
 mod tests {
+    use datafusion::logical_expr::{col, lit};
     use object_store::local::LocalFileSystem;
     use test_log::test;
 
@@ -488,6 +489,32 @@ mod tests {
             ];
 
             check_stream(result_stream, expected_batch).await;
+
+            // test with predicate
+            let expr = col("pk1").eq(lit(11_u8));
+            let result_stream = storage
+                .scan(ScanRequest {
+                    range: TimeRange::new(Timestamp(0), Timestamp::MAX),
+                    predicate: vec![expr],
+                    projections: None,
+                })
+                .await
+                .unwrap();
+            let expected_batch = [
+                record_batch!(
+                    ("pk1", UInt8, vec![11]),
+                    ("pk2", UInt8, vec![99]),
+                    ("value", Int64, vec![77])
+                )
+                .unwrap(),
+                record_batch!(
+                    ("pk1", UInt8, vec![11]),
+                    ("pk2", UInt8, vec![100]),
+                    ("value", Int64, vec![22])
+                )
+                .unwrap(),
+            ];
+            check_stream(result_stream, expected_batch).await;
         });
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to