This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new e2970b11 feat: partition table query optimize (#1594)
e2970b11 is described below

commit e2970b1171523b182b36dc67e642641c47db078f
Author: MianChen <[email protected]>
AuthorDate: Mon Nov 25 03:35:48 2024 -0600

    feat: partition table query optimize (#1594)
    
    ## Rationale
    Close #1441
    
    ## Detailed Changes
    ### TLDR
    The performance issue with inlist queries is due to the extra overhead
    from bloom-filter-like directory lookups when scanning each SST file for
    rows. The solution is to create a separate predicate for each partition,
    containing only the keys relevant to that partition. Since the current
    partition filter only supports BinaryExpr(Column, operator, Literal) and
    non-negated InList expressions, this solution will address only those
    specific cases.
    
    ### Changes
    1. During the scan building process, when identifying the partitions for
    a query, we create a PartitionedFilterKeyIndex variable to store the
    predicate key indices for each expression.
    2. In the compute_partition_for_keys_group function, we use a
    HashMap<partition_id, HashMap<filter_index, BTreeSet<key_index>>> to
    record the indices of keys involved in partition computation for each
    group.
    3. In the partitioned_predicates function, we construct the final
    predicates for each partition.
    4. In resolve_partitioned_scan_internal, we generate separate requests
    for each partition.
    
    e.g.
    conditions:
    1. table schema: col_ts, col1, col2, in which col1 and col2 are both
    keys,
         and with two partitions
    2. sql: select * from table where col1 = '33' and col2 in ("aa", "bb",
         "cc", "dd")
    
    partition expectations:
       yield two predicates
          p0: col1 = '33' and col2 in ("aa", "bb", "cc");
          p1: col1 = '33' and col2 in ("dd")
    
    ### Other issues discovered
    When the inlist key args length is less than three, Expr will be
    refactored to nested BinaryExpr which bypasses the FilterExtractor.
    
    e.g.
    SQL: select * from table where col1 in ("aa", "bb") and col2 in
    (1,2,3,4,5...1000)
    Since ("aa", "bb") has fewer than three elements, the col1 key filter is
    not included in partition computation, which interrupts the partitioning
    process in the get_candidate_partition_keys_groups function, as
    contains_empty_filter is set to true.
    
    
    ## Test Plan
    1. UT: test_partitioned_predicate
    2. Manual test.
    
    ---------
    
    Co-authored-by: jiacai2050 <[email protected]>
---
 .../cases/env/cluster/ddl/partition_table.result   |   8 +-
 .../cases/env/cluster/ddl/partition_table.sql      |   4 +-
 .../src/dist_sql_query/physical_plan.rs            |   5 +-
 .../src/dist_sql_query/resolver.rs                 |  16 +-
 .../src/dist_sql_query/test_util.rs                |   2 +
 src/partition_table_engine/src/lib.rs              |  49 ++++-
 src/partition_table_engine/src/partition.rs        |   8 +-
 src/partition_table_engine/src/scan_builder.rs     | 225 ++++++++++++++++++++-
 src/table_engine/src/engine.rs                     |   3 +
 .../src/partition/rule/df_adapter/extractor.rs     |  22 +-
 .../src/partition/rule/df_adapter/mod.rs           |  32 ++-
 src/table_engine/src/partition/rule/key.rs         |  79 ++++++--
 src/table_engine/src/partition/rule/mod.rs         |  12 +-
 src/table_engine/src/partition/rule/random.rs      |  11 +-
 src/table_engine/src/predicate.rs                  |   4 +
 15 files changed, 418 insertions(+), 62 deletions(-)

diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result 
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index 980a7bc1..d576d93b 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -99,24 +99,24 @@ 
UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
 -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
--- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
 -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
 EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
 
 plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:1, metrics=xx\n  ScanTable: 
table=__partition_table_t_1, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = 
Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: 
Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) 
} }\nscan_table:\n    do_merge_sort=true\n    iter_num=1\n    merge_iter_0:\n   
     in [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n    
__partition_table_t_1:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, 
metrics=[output_rows=0, elapsed_compute=xxs]\n  ScanTable: 
table=__partition_table_t_1, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Ut 
[...]
 
 
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
 -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
 -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
--- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
 -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
 EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", 
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
 
 plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:3, metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredica [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n    
__partition_table_t_x:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n    __partition_table_t_x:\n        
poll_duration=xxs\n        total_duration=xxs\n        wait_duration=xxs\n    
__partition_table_t_x:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n\n__partition_table_t_x:\n [...]
 
 
 ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql 
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index e1f32de5..f06dee2e 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -57,7 +57,7 @@ SELECT * from partition_table_t where name in ("horaedb5", 
"horaedb6", "horaedb7
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
 -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
--- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
 -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
 EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
 
@@ -65,7 +65,7 @@ EXPLAIN ANALYZE SELECT * from partition_table_t where name = 
"ceresdb0";
 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
 -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
 -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
--- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
 -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
 EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", 
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
 
diff --git a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs 
b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
index dd430f52..55692f25 100644
--- a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
+++ b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
@@ -47,7 +47,7 @@ use datafusion::{
 };
 use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
 use runtime::Priority;
-use table_engine::{remote::model::TableIdentifier, table::ReadRequest};
+use table_engine::{predicate::Predicate, remote::model::TableIdentifier, 
table::ReadRequest};
 use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector, 
TraceMetricWhenDrop};
 
 use crate::dist_sql_query::{RemotePhysicalPlanExecutor, RemoteTaskContext, 
TableScanContext};
@@ -62,6 +62,7 @@ pub struct UnresolvedPartitionedScan {
     pub table_scan_ctx: TableScanContext,
     pub metrics_collector: MetricsCollector,
     pub priority: Priority,
+    pub predicates: Option<Vec<Predicate>>,
 }
 
 impl UnresolvedPartitionedScan {
@@ -69,6 +70,7 @@ impl UnresolvedPartitionedScan {
         table_name: &str,
         sub_tables: Vec<TableIdentifier>,
         read_request: ReadRequest,
+        predicates: Option<Vec<Predicate>>,
     ) -> Self {
         let metrics_collector = MetricsCollector::new(table_name.to_string());
         let table_scan_ctx = TableScanContext {
@@ -83,6 +85,7 @@ impl UnresolvedPartitionedScan {
             table_scan_ctx,
             metrics_collector,
             priority: read_request.priority,
+            predicates,
         }
     }
 }
diff --git a/src/df_engine_extensions/src/dist_sql_query/resolver.rs 
b/src/df_engine_extensions/src/dist_sql_query/resolver.rs
index c3724b9d..5fd3430a 100644
--- a/src/df_engine_extensions/src/dist_sql_query/resolver.rs
+++ b/src/df_engine_extensions/src/dist_sql_query/resolver.rs
@@ -135,10 +135,22 @@ impl Resolver {
             let sub_tables = unresolved.sub_tables.clone();
             let remote_plans = sub_tables
                 .into_iter()
-                .map(|table| {
+                .enumerate()
+                .map(|(idx, table)| {
                     let plan = Arc::new(UnresolvedSubTableScan {
                         table: table.clone(),
-                        table_scan_ctx: unresolved.table_scan_ctx.clone(),
+                        table_scan_ctx: if let Some(ref predicates) = 
unresolved.predicates {
+                            // Since all each partition has different 
predicate, so we shall build
+                            // seperate ctx regarding each partition
+                            // with different predicate
+                            let mut ctx = unresolved.table_scan_ctx.clone();
+                            // overwrite old predicate (it's the predidcate 
before partiton
+                            // calculation) with optimized predicate
+                            ctx.predicate = Arc::new(predicates[idx].clone());
+                            ctx
+                        } else {
+                            unresolved.table_scan_ctx.clone()
+                        },
                     });
                     let sub_metrics_collect = 
metrics_collector.span(table.table.clone());
 
diff --git a/src/df_engine_extensions/src/dist_sql_query/test_util.rs 
b/src/df_engine_extensions/src/dist_sql_query/test_util.rs
index c42f9e38..873c7a22 100644
--- a/src/df_engine_extensions/src/dist_sql_query/test_util.rs
+++ b/src/df_engine_extensions/src/dist_sql_query/test_util.rs
@@ -313,6 +313,7 @@ impl TestContext {
             "test",
             sub_tables,
             self.request.clone(),
+            None,
         ));
 
         let filter: Arc<dyn ExecutionPlan> =
@@ -364,6 +365,7 @@ impl TestContext {
             "test",
             self.sub_table_groups[0].clone(),
             self.request.clone(),
+            None,
         ));
 
         self.build_aggr_plan_with_input(unresolved_scan)
diff --git a/src/partition_table_engine/src/lib.rs 
b/src/partition_table_engine/src/lib.rs
index 20904977..ee0b460c 100644
--- a/src/partition_table_engine/src/lib.rs
+++ b/src/partition_table_engine/src/lib.rs
@@ -27,14 +27,17 @@ use std::sync::Arc;
 
 use analytic_engine::TableOptions;
 use async_trait::async_trait;
+use datafusion::logical_expr::expr::{Expr, InList};
 use generic_error::BoxError;
-use snafu::{OptionExt, ResultExt};
+use snafu::{ensure, OptionExt, ResultExt};
 use table_engine::{
     engine::{
         CloseShardRequest, CloseTableRequest, CreateTableParams, 
CreateTableRequest,
-        DropTableRequest, OpenShardRequest, OpenShardResult, OpenTableRequest, 
Result, TableEngine,
-        Unexpected, UnexpectedNoCause,
+        DropTableRequest, InvalidPartitionContext, OpenShardRequest, 
OpenShardResult,
+        OpenTableRequest, Result, TableEngine, Unexpected, UnexpectedNoCause,
     },
+    partition::rule::df_adapter::PartitionedFilterKeyIndex,
+    predicate::Predicate,
     remote::RemoteEngineRef,
     table::TableRef,
     PARTITION_TABLE_ENGINE_TYPE,
@@ -110,3 +113,43 @@ impl TableEngine for PartitionTableEngine {
         vec![Ok("".to_string())]
     }
 }
+
+pub fn partitioned_predicates(
+    predicate: Arc<Predicate>,
+    partitions: &[usize],
+    partitioned_key_indices: &mut PartitionedFilterKeyIndex,
+) -> Result<Vec<Predicate>> {
+    ensure!(
+        partitions.len() == partitioned_key_indices.keys().len(),
+        InvalidPartitionContext {
+            msg: format!(
+                "partitions length:{}, partitioned_key_indices length: {}",
+                partitions.len(),
+                partitioned_key_indices.keys().len()
+            )
+        }
+    );
+    let mut predicates = vec![(*predicate).clone(); partitions.len()];
+    for (idx, predicate) in predicates.iter_mut().enumerate() {
+        let partition = partitions[idx];
+        if let Some(filter_indices) = partitioned_key_indices.get(&partition) {
+            let exprs = predicate.mut_exprs();
+            for (filter_idx, key_indices) in filter_indices {
+                if let Expr::InList(InList {
+                    list,
+                    negated: false,
+                    ..
+                }) = &mut exprs[*filter_idx]
+                {
+                    let mut idx = 0;
+                    list.retain(|_| {
+                        let should_kept = key_indices.contains(&idx);
+                        idx += 1;
+                        should_kept
+                    });
+                }
+            }
+        }
+    }
+    Ok(predicates)
+}
diff --git a/src/partition_table_engine/src/partition.rs 
b/src/partition_table_engine/src/partition.rs
index 44072216..2eb0a6a3 100644
--- a/src/partition_table_engine/src/partition.rs
+++ b/src/partition_table_engine/src/partition.rs
@@ -33,8 +33,8 @@ use table_engine::{
     partition::{
         format_sub_partition_table_name,
         rule::{
-            df_adapter::DfPartitionRuleAdapter, PartitionedRow, 
PartitionedRows,
-            PartitionedRowsIter,
+            df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex},
+            PartitionedRow, PartitionedRows, PartitionedRowsIter,
         },
         PartitionInfo,
     },
@@ -289,14 +289,14 @@ impl Table for PartitionTableImpl {
                     .context(CreatePartitionRule)?
             }
         };
-
+        let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
         // Evaluate expr and locate partition.
         let partitions = {
             let _locate_timer = 
PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM
                 .with_label_values(&["locate"])
                 .start_timer();
             df_partition_rule
-                .locate_partitions_for_read(request.predicate.exprs())
+                .locate_partitions_for_read(request.predicate.exprs(), &mut 
partitioned_key_indices)
                 .box_err()
                 .context(LocatePartitions)?
         };
diff --git a/src/partition_table_engine/src/scan_builder.rs 
b/src/partition_table_engine/src/scan_builder.rs
index 27281c0a..25d080d3 100644
--- a/src/partition_table_engine/src/scan_builder.rs
+++ b/src/partition_table_engine/src/scan_builder.rs
@@ -27,13 +27,16 @@ use datafusion::{
 use 
df_engine_extensions::dist_sql_query::physical_plan::UnresolvedPartitionedScan;
 use table_engine::{
     partition::{
-        format_sub_partition_table_name, 
rule::df_adapter::DfPartitionRuleAdapter, PartitionInfo,
+        format_sub_partition_table_name,
+        rule::df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex},
+        PartitionInfo,
     },
     provider::TableScanBuilder,
     remote::model::TableIdentifier,
     table::ReadRequest,
 };
 
+use crate::partitioned_predicates;
 #[derive(Debug)]
 pub struct PartitionedTableScanBuilder {
     table_name: String,
@@ -61,13 +64,13 @@ impl PartitionedTableScanBuilder {
         &self,
         table_name: &str,
         partition_info: &PartitionInfo,
-        partitions: Vec<usize>,
+        partitions: &[usize],
     ) -> Vec<TableIdentifier> {
         let definitions = partition_info.get_definitions();
         partitions
-            .into_iter()
+            .iter()
             .map(|p| {
-                let partition_name = &definitions[p].name;
+                let partition_name = &definitions[*p].name;
                 TableIdentifier {
                     catalog: self.catalog_name.clone(),
                     schema: self.schema_name.clone(),
@@ -89,18 +92,226 @@ impl TableScanBuilder for PartitionedTableScanBuilder {
                     DataFusionError::Internal(format!("failed to build 
partition rule, err:{e}"))
                 })?;
 
+        let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
         // Evaluate expr and locate partition.
         let partitions = df_partition_rule
-            .locate_partitions_for_read(request.predicate.exprs())
+            .locate_partitions_for_read(request.predicate.exprs(), &mut 
partitioned_key_indices)
             .map_err(|e| {
                 DataFusionError::Internal(format!("failed to locate partition 
for read, err:{e}"))
             })?;
+
         let sub_tables =
-            self.get_sub_table_idents(&self.table_name, &self.partition_info, 
partitions);
+            self.get_sub_table_idents(&self.table_name, &self.partition_info, 
&partitions);
+
+        let predicates = if partitioned_key_indices.len() == partitions.len() {
+            Some(
+                partitioned_predicates(
+                    request.predicate.clone(),
+                    &partitions,
+                    &mut partitioned_key_indices,
+                )
+                .map_err(|e| {
+                    DataFusionError::Internal(format!("partition predicates 
failed, err:{e}"))
+                })?,
+            )
+        } else {
+            // since FilterExtractor.extract only cover some specific expr
+            // cases, partitioned_key_indices.len() could be 0.
+            // All partition requests  will have the same predicate.
+            None
+        };
 
         // Build plan.
-        let plan = UnresolvedPartitionedScan::new(&self.table_name, 
sub_tables, request);
+        let plan =
+            UnresolvedPartitionedScan::new(&self.table_name, sub_tables, 
request, predicates);
 
         Ok(Arc::new(plan))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use common_types::{column_schema::Builder as ColBuilder, datum::DatumKind, 
schema::Builder};
+    use datafusion::logical_expr::{binary_expr, in_list, Expr, Operator};
+    use table_engine::{
+        partition::{
+            rule::df_adapter::{DfPartitionRuleAdapter, 
PartitionedFilterKeyIndex},
+            KeyPartitionInfo, PartitionDefinition, PartitionInfo,
+        },
+        predicate::PredicateBuilder,
+    };
+
+    use crate::partitioned_predicates;
+
+    #[test]
+    fn test_partitioned_predicate() {
+        // conditions:
+        //   1) table schema: col_ts, col1, col2, in which col1 and col2 are 
both keys,
+        //      and with two partitions
+        //   2) sql: select * from table where col1 = '33' and col2 in ("aa", 
"bb",
+        //      "cc", "dd")
+        // partition expectations:
+        //   1) query fit in two partitions
+        //   2) yield two predicates,  p0: col1 = '33' and col2 in ("aa", 
"bb", "cc");
+        //      p1: col1 = '33' and col2 in ("dd")
+        let definitions = vec![
+            PartitionDefinition {
+                name: "p1".to_string(),
+                origin_name: None,
+            },
+            PartitionDefinition {
+                name: "p2".to_string(),
+                origin_name: None,
+            },
+        ];
+
+        let partition_info = PartitionInfo::Key(KeyPartitionInfo {
+            version: 0,
+            definitions,
+            partition_key: vec!["col1".to_string(), "col2".to_string()],
+            linear: false,
+        });
+
+        let schema = {
+            let builder = Builder::new();
+            let col_ts = ColBuilder::new("col_ts".to_string(), 
DatumKind::Timestamp)
+                .build()
+                .expect("ts");
+            let col1 = ColBuilder::new("col1".to_string(), DatumKind::String)
+                .build()
+                .expect("should succeed to build column schema");
+            let col2 = ColBuilder::new("col2".to_string(), DatumKind::String)
+                .build()
+                .expect("should succeed to build column schema");
+            let col3 = ColBuilder::new("col3".to_string(), DatumKind::String)
+                .build()
+                .expect("should succeed to build column schema");
+            builder
+                .auto_increment_column_id(true)
+                .add_key_column(col_ts)
+                .unwrap()
+                .add_key_column(col1)
+                .unwrap()
+                .add_key_column(col2)
+                .unwrap()
+                .add_key_column(col3)
+                .unwrap()
+                .primary_key_indexes(vec![1, 2])
+                .build()
+                .unwrap()
+        };
+
+        let df_partition_rule = DfPartitionRuleAdapter::new(partition_info, 
&schema).unwrap();
+
+        let exprs = vec![
+            binary_expr(
+                Expr::Column("col1".into()),
+                Operator::Eq,
+                Expr::Literal("33".into()),
+            ),
+            in_list(
+                Expr::Column("col2".into()),
+                vec![
+                    Expr::Literal("aa".into()),
+                    Expr::Literal("bb".into()),
+                    Expr::Literal("cc".into()),
+                    Expr::Literal("dd".into()),
+                ],
+                false,
+            ),
+            in_list(
+                Expr::Column("col3".into()),
+                vec![
+                    Expr::Literal("1".into()),
+                    Expr::Literal("2".into()),
+                    Expr::Literal("3".into()),
+                    Expr::Literal("4".into()),
+                ],
+                false,
+            ),
+        ];
+        let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
+        let partitions = df_partition_rule
+            .locate_partitions_for_read(&exprs, &mut partitioned_key_indices)
+            .unwrap();
+        assert!(partitions.len() == 2);
+        assert!(partitioned_key_indices.len() == 2);
+
+        let predicate = PredicateBuilder::default()
+            .add_pushdown_exprs(exprs.as_slice())
+            .build();
+
+        let predicates = partitioned_predicates(
+            predicate,
+            partitions.as_slice(),
+            &mut partitioned_key_indices,
+        );
+        assert!(predicates.is_ok());
+        let predicates = predicates.unwrap();
+        assert!(predicates.len() == 2);
+
+        assert!(predicates[0].exprs().len() == 3);
+        assert!(
+            predicates[0].exprs()[0]
+                == binary_expr(
+                    Expr::Column("col1".into()),
+                    Operator::Eq,
+                    Expr::Literal("33".into())
+                )
+        );
+        assert!(
+            predicates[0].exprs()[1]
+                == in_list(
+                    Expr::Column("col2".into()),
+                    vec![
+                        Expr::Literal("aa".into()),
+                        Expr::Literal("bb".into()),
+                        Expr::Literal("cc".into()),
+                    ],
+                    false,
+                )
+        );
+        assert!(
+            predicates[0].exprs()[2]
+                == in_list(
+                    Expr::Column("col3".into()),
+                    vec![
+                        Expr::Literal("1".into()),
+                        Expr::Literal("2".into()),
+                        Expr::Literal("3".into()),
+                        Expr::Literal("4".into()),
+                    ],
+                    false,
+                )
+        );
+        assert!(
+            predicates[1].exprs()[0]
+                == binary_expr(
+                    Expr::Column("col1".into()),
+                    Operator::Eq,
+                    Expr::Literal("33".into())
+                )
+        );
+        assert!(
+            predicates[1].exprs()[1]
+                == in_list(
+                    Expr::Column("col2".into()),
+                    vec![Expr::Literal("dd".into()),],
+                    false,
+                )
+        );
+        assert!(
+            predicates[1].exprs()[2]
+                == in_list(
+                    Expr::Column("col3".into()),
+                    vec![
+                        Expr::Literal("1".into()),
+                        Expr::Literal("2".into()),
+                        Expr::Literal("3".into()),
+                        Expr::Literal("4".into()),
+                    ],
+                    false,
+                )
+        );
+    }
+}
diff --git a/src/table_engine/src/engine.rs b/src/table_engine/src/engine.rs
index a9ea1337..0f81e0c2 100644
--- a/src/table_engine/src/engine.rs
+++ b/src/table_engine/src/engine.rs
@@ -97,6 +97,9 @@ pub enum Error {
         msg: Option<String>,
         source: GenericError,
     },
+
+    #[snafu(display("Invalid partiton context, err:{}", msg))]
+    InvalidPartitionContext { msg: String },
 }
 
 define_result!(Error);
diff --git a/src/table_engine/src/partition/rule/df_adapter/extractor.rs 
b/src/table_engine/src/partition/rule/df_adapter/extractor.rs
index ff6c393d..78007500 100644
--- a/src/table_engine/src/partition/rule/df_adapter/extractor.rs
+++ b/src/table_engine/src/partition/rule/df_adapter/extractor.rs
@@ -16,14 +16,16 @@
 // under the License.
 
 //! Partition filter extractor
-
 use std::collections::HashSet;
 
 use common_types::datum::Datum;
 use datafusion::logical_expr::{expr::InList, Expr, Operator};
 use df_operator::visitor::find_columns_by_expr;
 
-use crate::partition::rule::filter::{PartitionCondition, PartitionFilter};
+use crate::partition::rule::{
+    df_adapter::IndexedPartitionFilter,
+    filter::{PartitionCondition, PartitionFilter},
+};
 
 /// The datafusion filter exprs extractor
 ///
@@ -36,13 +38,13 @@ use crate::partition::rule::filter::{PartitionCondition, 
PartitionFilter};
 /// For example: [KeyRule] and [KeyExtractor].
 /// If they are not related, [PartitionRule] may not take effect.
 pub trait FilterExtractor: Send + Sync + 'static {
-    fn extract(&self, filters: &[Expr], columns: &[String]) -> 
Vec<PartitionFilter>;
+    fn extract(&self, filters: &[Expr], columns: &[String]) -> 
IndexedPartitionFilter;
 }
 
 pub struct NoopExtractor;
 
 impl FilterExtractor for NoopExtractor {
-    fn extract(&self, _filters: &[Expr], _columns: &[String]) -> 
Vec<PartitionFilter> {
+    fn extract(&self, _filters: &[Expr], _columns: &[String]) -> 
IndexedPartitionFilter {
         vec![]
     }
 }
@@ -50,13 +52,14 @@ impl FilterExtractor for NoopExtractor {
 pub struct KeyExtractor;
 
 impl FilterExtractor for KeyExtractor {
-    fn extract(&self, filters: &[Expr], columns: &[String]) -> 
Vec<PartitionFilter> {
+    fn extract(&self, filters: &[Expr], columns: &[String]) -> 
IndexedPartitionFilter {
+        // PartitionFilter indices may not the same as filters indices
         if filters.is_empty() {
             return Vec::default();
         }
 
         let mut target = Vec::with_capacity(filters.len());
-        for filter in filters {
+        for (index, filter) in filters.iter().enumerate() {
             // If no target columns included in `filter`, ignore this `filter`.
             let columns_in_filter = find_columns_by_expr(filter)
                 .into_iter()
@@ -78,7 +81,6 @@ impl FilterExtractor for KeyExtractor {
 
             // Finally, we try to convert `filter` to `PartitionFilter`.
             // We just support the simple situation: "colum = value" now.
-            // TODO: support "colum in [value list]".
             // TODO: we need to compare and check the datatype of column and 
value.
             // (Actually, there is type conversion on high-level, but when 
converted data
             // is overflow, it may take no effect).
@@ -126,7 +128,7 @@ impl FilterExtractor for KeyExtractor {
             };
 
             if let Some(pf) = partition_filter {
-                target.push(pf);
+                target.push((index, pf));
             }
         }
 
@@ -157,7 +159,7 @@ mod tests {
             column: "col1".to_string(),
             condition: PartitionCondition::Eq(Datum::Int32(42)),
         };
-        assert_eq!(partition_filter.first().unwrap(), &expected);
+        assert_eq!(partition_filter.first().unwrap().1, expected);
 
         // Other expr will be rejected now.
         let rejected_expr = 
col("col1").gt(Expr::Literal(ScalarValue::Int32(Some(42))));
@@ -182,7 +184,7 @@ mod tests {
             column: "col1".to_string(),
             condition: PartitionCondition::In(vec![Datum::Int32(42), 
Datum::Int32(38)]),
         };
-        assert_eq!(partition_filter.first().unwrap(), &expected);
+        assert_eq!(partition_filter.first().unwrap().1, expected);
     }
 
     #[test]
diff --git a/src/table_engine/src/partition/rule/df_adapter/mod.rs 
b/src/table_engine/src/partition/rule/df_adapter/mod.rs
index 8e95f856..1b5df6ca 100644
--- a/src/table_engine/src/partition/rule/df_adapter/mod.rs
+++ b/src/table_engine/src/partition/rule/df_adapter/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 //! Partition rule datafusion adapter
+use std::collections::{BTreeSet, HashMap};
 
 use common_types::{row::RowGroup, schema::Schema};
 use datafusion::logical_expr::Expr;
@@ -23,14 +24,22 @@ use datafusion::logical_expr::Expr;
 use self::extractor::{KeyExtractor, NoopExtractor};
 use crate::partition::{
     rule::{
-        df_adapter::extractor::FilterExtractorRef, 
factory::PartitionRuleFactory, PartitionRulePtr,
-        PartitionedRows,
+        df_adapter::extractor::FilterExtractorRef, 
factory::PartitionRuleFactory,
+        filter::PartitionFilter, PartitionRulePtr, PartitionedRows,
     },
     BuildPartitionRule, PartitionInfo, Result,
 };
 
 mod extractor;
 
+pub type PartitionId = usize; // partiton number (id)
+pub type FilterIndex = usize; // filter (or expr) index regarding 
predicate.exprs()
+pub type KeyIndex = usize; // key index regarding inlist expr
+pub type FilterKeyIndex = HashMap<FilterIndex, BTreeSet<KeyIndex>>;
+pub type PartitionedFilterKeyIndex = HashMap<PartitionId, FilterKeyIndex>;
+pub type IndexedPartitionFilter = Vec<(usize, PartitionFilter)>;
+pub type IndexedPartitionFilterRef<'a> = &'a [(usize, PartitionFilter)];
+
 /// Partition rule's adapter for datafusion
 pub struct DfPartitionRuleAdapter {
     /// Partition rule
@@ -56,12 +65,17 @@ impl DfPartitionRuleAdapter {
         self.rule.location_partitions_for_write(row_group)
     }
 
-    pub fn locate_partitions_for_read(&self, filters: &[Expr]) -> 
Result<Vec<usize>> {
+    pub fn locate_partitions_for_read(
+        &self,
+        filters: &[Expr],
+        partitioned_key_indices: &mut PartitionedFilterKeyIndex,
+    ) -> Result<Vec<usize>> {
         // Extract partition filters from datafusion filters.
         let partition_filters = self.extractor.extract(filters, 
self.columns());
 
         // Locate partitions from filters.
-        self.rule.locate_partitions_for_read(&partition_filters)
+        self.rule
+            .locate_partitions_for_read(&partition_filters, 
partitioned_key_indices)
     }
 
     fn create_extractor(partition_info: &PartitionInfo) -> 
Result<FilterExtractorRef> {
@@ -116,8 +130,9 @@ mod tests {
         // Basic flow
         let key_rule_adapter =
             DfPartitionRuleAdapter::new(PartitionInfo::Key(ket_partition), 
&schema).unwrap();
+        let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
         let partitions = key_rule_adapter
-            .locate_partitions_for_read(&valid_filters_1)
+            .locate_partitions_for_read(&valid_filters_1, &mut 
partitioned_key_indices)
             .unwrap();
 
         let partition_keys = [
@@ -132,7 +147,7 @@ mod tests {
 
         // Conflict filter and empty partitions
         let partitions = key_rule_adapter
-            .locate_partitions_for_read(&valid_filters_2)
+            .locate_partitions_for_read(&valid_filters_2, &mut 
partitioned_key_indices)
             .unwrap();
 
         assert!(partitions.is_empty());
@@ -161,12 +176,13 @@ mod tests {
         let key_rule_adapter =
             DfPartitionRuleAdapter::new(PartitionInfo::Key(ket_partition), 
&schema).unwrap();
 
+        let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
         // Partitions located from invalid filters.
         let partitions_1 = key_rule_adapter
-            .locate_partitions_for_read(&invalid_filters_1)
+            .locate_partitions_for_read(&invalid_filters_1, &mut 
partitioned_key_indices)
             .unwrap();
         let partitions_2 = key_rule_adapter
-            .locate_partitions_for_read(&invalid_filters_2)
+            .locate_partitions_for_read(&invalid_filters_2, &mut 
partitioned_key_indices)
             .unwrap();
 
         // Expected
diff --git a/src/table_engine/src/partition/rule/key.rs 
b/src/table_engine/src/partition/rule/key.rs
index 70a3038f..7692c800 100644
--- a/src/table_engine/src/partition/rule/key.rs
+++ b/src/table_engine/src/partition/rule/key.rs
@@ -30,7 +30,9 @@ use snafu::OptionExt;
 
 use crate::partition::{
     rule::{
-        filter::PartitionCondition, PartitionFilter, PartitionRule, 
PartitionedRow, PartitionedRows,
+        df_adapter::{IndexedPartitionFilterRef, PartitionedFilterKeyIndex},
+        filter::PartitionCondition,
+        PartitionFilter, PartitionRule, PartitionedRow, PartitionedRows,
     },
     Internal, LocateWritePartition, Result,
 };
@@ -139,15 +141,29 @@ impl KeyRule {
         &self,
         group: &[usize],
         filters: &[PartitionFilter],
-    ) -> Result<BTreeSet<usize>> {
-        let mut partitions = BTreeSet::new();
+    ) -> Result<PartitionedFilterKeyIndex> {
+        // Retrieve all the key DatumView instances along with their 
corresponding
+        // indices related to their positions in the predicate inlist.
         let expanded_group = expand_partition_keys_group(group, filters)?;
-        for partition_keys in expanded_group {
-            let partition = compute_partition(partition_keys.into_iter(), 
self.partition_num);
-            partitions.insert(partition);
+
+        let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
+        for indexed_partition_keys in expanded_group {
+            // batch all the keys for hash computation
+            let partition_keys = indexed_partition_keys.iter().map(|item| 
item.1.clone());
+            let partition = compute_partition(partition_keys, 
self.partition_num);
+
+            // collect all the key indices related to all predicate expr in 
the target
+            // partition
+            let filter_inlist_indices = 
partitioned_key_indices.entry(partition).or_default();
+            for (index, item) in indexed_partition_keys.iter().enumerate() {
+                filter_inlist_indices
+                    .entry(group[index])
+                    .or_default()
+                    .insert(item.0);
+            }
         }
 
-        Ok(partitions)
+        Ok(partitioned_key_indices)
     }
 
     #[inline]
@@ -189,16 +205,25 @@ impl PartitionRule for KeyRule {
         Ok(PartitionedRows::Multiple(Box::new(iter)))
     }
 
-    fn locate_partitions_for_read(&self, filters: &[PartitionFilter]) -> 
Result<Vec<usize>> {
+    fn locate_partitions_for_read(
+        &self,
+        indexed_filters: IndexedPartitionFilterRef,
+        partitioned_key_indices: &mut PartitionedFilterKeyIndex,
+    ) -> Result<Vec<usize>> {
         // Filters are empty.
-        if filters.is_empty() {
+        if indexed_filters.is_empty() {
             return Ok(self.all_partitions());
         }
 
+        let filters = indexed_filters
+            .iter()
+            .map(|(_idx, filter)| filter.clone())
+            .collect::<Vec<_>>();
+
         // Group the filters by their columns.
         // If found invalid filter, return all partitions.
         let candidate_partition_keys_groups = self
-            .get_candidate_partition_keys_groups(filters)
+            .get_candidate_partition_keys_groups(&filters)
             .map_err(|e| {
                 error!("KeyRule locate partition for read, err:{}", e);
             })
@@ -208,11 +233,31 @@ impl PartitionRule for KeyRule {
         }
 
         let (first_group, rest_groups) = 
candidate_partition_keys_groups.split_first().unwrap();
-        let mut target_partitions = 
self.compute_partition_for_keys_group(first_group, filters)?;
+        let mut partitioned_key_indices_all =
+            self.compute_partition_for_keys_group(first_group, 
filters.as_slice())?;
+        let mut target_partitions: BTreeSet<usize> =
+            partitioned_key_indices_all.keys().copied().collect();
         for group in rest_groups {
             // Same as above, if found invalid, return all partitions.
-            let partitions = match 
self.compute_partition_for_keys_group(group, filters) {
-                Ok(partitions) => partitions,
+            let partitions = match 
self.compute_partition_for_keys_group(group, filters.as_slice())
+            {
+                Ok(partitioned_filter_key_index_rest) => {
+                    for (partition_rest, filter_key_index_rest) in
+                        &partitioned_filter_key_index_rest
+                    {
+                        // merge all the rest key indices.
+                        let filter_key_index = partitioned_key_indices_all
+                            .entry(*partition_rest)
+                            .or_default();
+                        for item in filter_key_index_rest {
+                            filter_key_index
+                                .entry(*item.0)
+                                .or_default()
+                                .extend(item.1.iter());
+                        }
+                    }
+                    partitioned_filter_key_index_rest.keys().copied().collect()
+                }
                 Err(e) => {
                     error!("KeyRule locate partition for read, err:{}", e);
                     return Ok(self.all_partitions());
@@ -225,6 +270,8 @@ impl PartitionRule for KeyRule {
                 .collect::<BTreeSet<_>>();
         }
 
+        partitioned_key_indices.extend(partitioned_key_indices_all);
+
         Ok(target_partitions.into_iter().collect())
     }
 }
@@ -232,7 +279,7 @@ impl PartitionRule for KeyRule {
 fn expand_partition_keys_group<'a>(
     group: &[usize],
     filters: &'a [PartitionFilter],
-) -> Result<impl Iterator<Item = Vec<DatumView<'a>>>> {
+) -> Result<impl Iterator<Item = Vec<(usize, DatumView<'a>)>>> {
     let mut datum_by_columns = Vec::with_capacity(group.len());
     for filter_idx in group {
         let filter = &filters[*filter_idx];
@@ -252,7 +299,7 @@ fn expand_partition_keys_group<'a>(
 
     Ok(datum_by_columns
         .into_iter()
-        .map(|filters| filters.into_iter())
+        .map(|filters| filters.into_iter().enumerate())
         .multi_cartesian_product())
 }
 
@@ -546,7 +593,7 @@ mod tests {
         // Expanded group
         let expanded_group = expand_partition_keys_group(&group, &filters)
             .unwrap()
-            .map(|v| v.iter().map(|view| view.to_datum()).collect_vec())
+            .map(|v| v.iter().map(|view| view.1.to_datum()).collect_vec())
             .collect_vec();
         let expected = vec![
             vec![Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)],
diff --git a/src/table_engine/src/partition/rule/mod.rs 
b/src/table_engine/src/partition/rule/mod.rs
index a1c138f3..b17ec85f 100644
--- a/src/table_engine/src/partition/rule/mod.rs
+++ b/src/table_engine/src/partition/rule/mod.rs
@@ -22,10 +22,12 @@ mod factory;
 mod filter;
 mod key;
 mod random;
-
 use common_types::row::{Row, RowGroup};
 
-use self::filter::PartitionFilter;
+use self::{
+    df_adapter::{IndexedPartitionFilterRef, PartitionedFilterKeyIndex},
+    filter::PartitionFilter,
+};
 use crate::partition::Result;
 
 /// The partitioned rows of the written requests.
@@ -67,7 +69,11 @@ pub trait PartitionRule: Send + Sync + 'static {
     /// passed here.
     ///
     /// If unexpected filters still found, all partitions will be returned.
-    fn locate_partitions_for_read(&self, filters: &[PartitionFilter]) -> 
Result<Vec<usize>>;
+    fn locate_partitions_for_read(
+        &self,
+        indexed_filters: IndexedPartitionFilterRef,
+        partitioned_key_indices: &mut PartitionedFilterKeyIndex,
+    ) -> Result<Vec<usize>>;
 }
 
 pub type PartitionRulePtr = Box<dyn PartitionRule>;
diff --git a/src/table_engine/src/partition/rule/random.rs 
b/src/table_engine/src/partition/rule/random.rs
index 0be8804f..d1e84a03 100644
--- a/src/table_engine/src/partition/rule/random.rs
+++ b/src/table_engine/src/partition/rule/random.rs
@@ -21,7 +21,10 @@ use common_types::row::RowGroup;
 use itertools::Itertools;
 
 use crate::partition::{
-    rule::{filter::PartitionFilter, PartitionRule, PartitionedRows},
+    rule::{
+        df_adapter::{IndexedPartitionFilterRef, PartitionedFilterKeyIndex},
+        PartitionRule, PartitionedRows,
+    },
     Result,
 };
 
@@ -47,7 +50,11 @@ impl PartitionRule for RandomRule {
         })
     }
 
-    fn locate_partitions_for_read(&self, _filters: &[PartitionFilter]) -> 
Result<Vec<usize>> {
+    fn locate_partitions_for_read(
+        &self,
+        _indexed_filters: IndexedPartitionFilterRef,
+        _partitioned_key_indices: &mut PartitionedFilterKeyIndex,
+    ) -> Result<Vec<usize>> {
         Ok((0..self.partition_num).collect_vec())
     }
 }
diff --git a/src/table_engine/src/predicate.rs 
b/src/table_engine/src/predicate.rs
index 3a3294fc..37c09e9d 100644
--- a/src/table_engine/src/predicate.rs
+++ b/src/table_engine/src/predicate.rs
@@ -104,6 +104,10 @@ impl Predicate {
         &self.exprs
     }
 
+    pub fn mut_exprs(&mut self) -> &mut [Expr] {
+        &mut self.exprs
+    }
+
     pub fn time_range(&self) -> TimeRange {
         self.time_range
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to