This is an automated email from the ASF dual-hosted git repository.
maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push:
new 52d87cdc8e9 feat: support runtime filter push down to Table AM
52d87cdc8e9 is described below
commit 52d87cdc8e902f79d3e4e1db894d5d42bafa7a2c
Author: GongXun <[email protected]>
AuthorDate: Thu Aug 21 16:42:13 2025 +0800
feat: support runtime filter push down to Table AM
when seq scan begins, check whether the scanflags of table am is set to
determine whether the runtime filter is pushed down.
When the runtime filter is pushed down to pax am, pax am converts the
min/max
scankey in the runtime filter into PFTNode and performs min/max filtering.
---
.../src/cpp/access/pax_access_handle.cc | 1 +
contrib/pax_storage/src/cpp/access/pax_scanner.cc | 10 ++--
.../src/cpp/storage/filter/pax_filter.cc | 3 +-
.../src/cpp/storage/filter/pax_filter.h | 2 +-
.../src/cpp/storage/filter/pax_sparse_filter.h | 4 +-
.../src/cpp/storage/filter/pax_sparse_pg_path.cc | 62 +++++++++++++++++++++-
src/backend/executor/nodeSeqscan.c | 14 +++--
7 files changed, 82 insertions(+), 14 deletions(-)
diff --git a/contrib/pax_storage/src/cpp/access/pax_access_handle.cc
b/contrib/pax_storage/src/cpp/access/pax_access_handle.cc
index da07cddd5d7..6dcaecd3205 100644
--- a/contrib/pax_storage/src/cpp/access/pax_access_handle.cc
+++ b/contrib/pax_storage/src/cpp/access/pax_access_handle.cc
@@ -453,6 +453,7 @@ uint32 PaxAccessMethod::ScanFlags(Relation relation) {
flags |= SCAN_FORCE_BIG_WRITE_LOCK;
#endif
+ flags |= SCAN_SUPPORT_RUNTIME_FILTER;
return flags;
}
diff --git a/contrib/pax_storage/src/cpp/access/pax_scanner.cc
b/contrib/pax_storage/src/cpp/access/pax_scanner.cc
index a5e0b632002..5a354e6fa0c 100644
--- a/contrib/pax_storage/src/cpp/access/pax_scanner.cc
+++ b/contrib/pax_storage/src/cpp/access/pax_scanner.cc
@@ -218,7 +218,7 @@ bool PaxScanDesc::BitmapNextTuple(struct TBMIterateResult
*tbmres,
}
TableScanDesc PaxScanDesc::BeginScan(Relation relation, Snapshot snapshot,
- int nkeys, struct ScanKeyData * /*key*/,
+ int nkeys, struct ScanKeyData *key,
ParallelTableScanDesc pscan, uint32 flags,
std::shared_ptr<PaxFilter> &&pax_filter,
bool build_bitmap) {
@@ -326,8 +326,8 @@ void PaxScanDesc::EndScan() {
}
TableScanDesc PaxScanDesc::BeginScanExtractColumns(
- Relation rel, Snapshot snapshot, int /*nkeys*/,
- struct ScanKeyData * /*key*/, ParallelTableScanDesc parallel_scan,
+ Relation rel, Snapshot snapshot, int nkeys,
+ struct ScanKeyData *key, ParallelTableScanDesc parallel_scan,
struct PlanState *ps, uint32 flags) {
std::shared_ptr<PaxFilter> filter;
List *targetlist = ps->plan->targetlist;
@@ -361,7 +361,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns(
filter->SetColumnProjection(std::move(col_bits));
if (pax_enable_sparse_filter) {
- filter->InitSparseFilter(rel, qual);
+ filter->InitSparseFilter(rel, qual, key, nkeys);
// FIXME: enable predicate pushdown can filter rows immediately without
// assigning all columns. But it may mess the filter orders for multiple
@@ -375,7 +375,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns(
filter->InitRowFilter(rel, ps, filter->GetColumnProjection());
}
}
- return BeginScan(rel, snapshot, 0, nullptr, parallel_scan, flags,
+ return BeginScan(rel, snapshot, nkeys, key, parallel_scan, flags,
std::move(filter), build_bitmap);
}
diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc
b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc
index d7f752a33ec..5f19ab58400 100644
--- a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc
+++ b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc
@@ -44,11 +44,12 @@ namespace pax {
PaxFilter::PaxFilter() : sparse_filter_(nullptr), row_filter_(nullptr) {}
void PaxFilter::InitSparseFilter(Relation relation, List *quals,
+ ScanKey key, int nkeys,
bool allow_fallback_to_pg) {
Assert(!sparse_filter_);
sparse_filter_ =
std::make_shared<PaxSparseFilter>(relation, allow_fallback_to_pg);
- sparse_filter_->Initialize(quals);
+ sparse_filter_->Initialize(quals, key, nkeys);
}
#ifdef VEC_BUILD
diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h
b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h
index 467b841ec89..ebc2fff8538 100644
--- a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h
+++ b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h
@@ -50,7 +50,7 @@ class PaxFilter final {
~PaxFilter() = default;
// The sparse filter
- void InitSparseFilter(Relation relation, List *quals,
+ void InitSparseFilter(Relation relation, List *quals, ScanKey key, int nkeys,
bool allow_fallback_to_pg = false);
#ifdef VEC_BUILD
void InitSparseFilter(
diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_filter.h
b/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_filter.h
index 504878c4dd2..6efa59a7ff6 100644
--- a/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_filter.h
+++ b/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_filter.h
@@ -65,7 +65,7 @@ class PaxSparseFilter final {
bool ExistsFilterPath() const;
- void Initialize(List *quals);
+ void Initialize(List *quals, ScanKey key, int nkeys);
#ifdef VEC_BUILD
void Initialize(
@@ -83,6 +83,8 @@ class PaxSparseFilter final {
private:
#endif
+ std::shared_ptr<PFTNode> ProcessScanKey(ScanKey key);
+
// Used to build the filter tree with the PG quals
std::shared_ptr<PFTNode> ExprWalker(Expr *expr);
Expr *ExprFlatVar(Expr *expr);
diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc
b/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc
index 0630db6dc21..3a7bc64f389 100644
--- a/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc
+++ b/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc
@@ -36,7 +36,7 @@
namespace pax {
-void PaxSparseFilter::Initialize(List *quals) {
+void PaxSparseFilter::Initialize(List *quals, ScanKey key, int nkeys) {
ListCell *qual_cell;
std::vector<std::shared_ptr<PFTNode>> fl_nodes; /* first level nodes */
std::string origin_tree_str;
@@ -44,10 +44,27 @@ void PaxSparseFilter::Initialize(List *quals) {
// no inited
Assert(!filter_tree_);
- if (!quals) {
+ if (!quals && nkeys == 0) {
return;
}
+ // walk scan key and only support min/max filter now
+ for (int i = 0; i < nkeys; i++) {
+ // TODO: support bloom filter in PaxFilter
+ // but now just skip it, SeqNext() will check bloom filter in
PassByBloomFilter()
+ if (key[i].sk_flags & SK_BLOOM_FILTER) {
+ continue;
+ }
+
+ if (key[i].sk_strategy != BTGreaterEqualStrategyNumber &&
+ key[i].sk_strategy != BTLessEqualStrategyNumber) {
+ continue;
+ }
+ std::shared_ptr<PFTNode> fl_node = ProcessScanKey(&key[i]);
+ Assert(fl_node);
+ fl_nodes.emplace_back(std::move(fl_node));
+ }
+
foreach (qual_cell, quals) {
Expr *fl_clause = (Expr *)lfirst(qual_cell);
std::shared_ptr<PFTNode> fl_node = ExprWalker(fl_clause);
@@ -67,6 +84,47 @@ void PaxSparseFilter::Initialize(List *quals) {
origin_tree_str.c_str(), DebugString().c_str());
}
+std::shared_ptr<PFTNode> PaxSparseFilter::ProcessScanKey(ScanKey key) {
+ std::shared_ptr<PFTNode> node = nullptr;
+ Assert(key);
+ Assert(!(key->sk_flags & SK_BLOOM_FILTER));
+ Assert(key->sk_strategy == BTGreaterEqualStrategyNumber ||
+ key->sk_strategy == BTLessEqualStrategyNumber);
+ Assert(key->sk_attno > 0 &&
+ key->sk_attno <= RelationGetNumberOfAttributes(rel_));
+
+ AttrNumber attno = key->sk_attno;
+
+ // Build VarNode on the left
+ auto var_node = std::make_shared<VarNode>();
+ var_node->attrno = attno;
+
+ // Build ConstNode on the right from ScanKey
+ auto const_node = std::make_shared<ConstNode>();
+ const_node->const_val = key->sk_argument;
+ const_node->const_type = key->sk_subtype;
+ if (key->sk_flags & SK_ISNULL) {
+ const_node->sk_flags |= SK_ISNULL;
+ }
+
+ // Build OpNode and attach children: (var, const)
+ auto op_node = std::make_shared<OpNode>();
+ op_node->strategy = key->sk_strategy;
+ op_node->collation = key->sk_collation; // may be InvalidOid; executor will
+ // fallback to attr collation
+
+ // Set operand types
+ Form_pg_attribute attr = TupleDescAttr(RelationGetDescr(rel_), attno - 1);
+ op_node->left_typid = attr->atttypid;
+ op_node->right_typid = key->sk_subtype;
+
+ PFTNode::AppendSubNode(op_node, std::move(var_node));
+ PFTNode::AppendSubNode(op_node, std::move(const_node));
+
+ node = op_node;
+ return node;
+}
+
Expr *PaxSparseFilter::ExprFlatVar(Expr *clause) {
Expr *flat_clause = clause;
if (unlikely(!clause)) {
diff --git a/src/backend/executor/nodeSeqscan.c
b/src/backend/executor/nodeSeqscan.c
index c9debfb1fd0..7ee5a2e94cf 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -80,13 +80,18 @@ SeqNext(SeqScanState *node)
* node->filter_in_seqscan is false means scankey need to be
pushed to
* AM.
*/
- if (gp_enable_runtime_filter_pushdown &&
!node->filter_in_seqscan)
+ if (gp_enable_runtime_filter_pushdown &&
node->filter_in_seqscan && node->filters &&
+ (table_scan_flags(node->ss.ss_currentRelation) &
+ (SCAN_SUPPORT_RUNTIME_FILTER)))
+ {
+ // pushdown runtime filter to AM
keys = ScanKeyListToArray(node->filters, &nkeys);
+ }
/*
- * We reach here if the scan is not parallel, or if we're
serially
- * executing a scan that was planned to be parallel.
- */
+ * We reach here if the scan is not parallel, or if we're
serially
+ * executing a scan that was planned to be parallel.
+ */
scandesc = table_beginscan_es(node->ss.ss_currentRelation,
estate->es_snapshot,
nkeys, keys,
@@ -102,6 +107,7 @@ SeqNext(SeqScanState *node)
{
while (table_scan_getnextslot(scandesc, direction, slot))
{
+ // TODO: later pushdown bloom filter to AM
if (!PassByBloomFilter(&node->ss.ps, node->filters,
slot))
continue;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]