This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch feature/mor_value_predicate_pushdown_control in repository https://gitbox.apache.org/repos/asf/doris.git
commit 40b0399b0f9df305ab31c526693cd80b7af1b6d5 Author: Yongqiang YANG <[email protected]> AuthorDate: Wed Feb 4 09:23:40 2026 -0800 [feature](scan) Add session variable to control value predicate pushdown for MOR tables Add a new session variable `enable_mor_value_predicate_pushdown_tables` to allow users to selectively enable value column predicate pushdown for MOR (Merge-On-Read) tables. This can improve query performance by utilizing inverted indexes on value columns for filtering. The session variable accepts: - Comma-separated table names: `db1.tbl1,db2.tbl2` - Wildcard for all MOR tables: `*` - Empty string to disable (default) Changes: - Add session variable in SessionVariable.java with helper method - Add isMorTable() helper in OlapTable.java - Add Thrift field enable_mor_value_predicate_pushdown in TOlapScanNode - Set flag in OlapScanNode.toThrift() based on session variable - Add virtual method _should_push_down_mor_value_predicate() in scan_operator - Implement override in olap_scan_operator to read the flag - Modify predicate pushdown condition in scan_operator.cpp --- be/src/pipeline/exec/olap_scan_operator.cpp | 6 ++++ be/src/pipeline/exec/olap_scan_operator.h | 2 ++ be/src/pipeline/exec/scan_operator.cpp | 3 +- be/src/pipeline/exec/scan_operator.h | 1 + .../java/org/apache/doris/catalog/OlapTable.java | 8 +++++ .../org/apache/doris/planner/OlapScanNode.java | 9 +++++ .../java/org/apache/doris/qe/SessionVariable.java | 39 ++++++++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 2 ++ 8 files changed, 69 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 46aa38577bf..2854df5e3e1 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -445,6 +445,12 @@ bool OlapScanLocalState::_storage_no_merge() { p._olap_scan_node.enable_unique_key_merge_on_write)); } +bool OlapScanLocalState::_should_push_down_mor_value_predicate() { + auto& p = _parent->cast<OlapScanOperatorX>(); + return p._olap_scan_node.__isset.enable_mor_value_predicate_pushdown && + p._olap_scan_node.enable_mor_value_predicate_pushdown; +} + Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) { if (_scan_ranges.empty()) { _eos = true; diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index cd054c66408..5d76865cb4b 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -105,6 +105,8 @@ private: bool _storage_no_merge() override; + bool _should_push_down_mor_value_predicate() override; + bool _push_down_topn(const vectorized::RuntimePredicate& predicate) override { if (!predicate.target_is_slot(_parent->node_id())) { return false; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index e8d8193a289..8453028b1a2 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -439,7 +439,8 @@ Status ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c return Status::OK(); } - if (pdt == PushDownType::ACCEPTABLE && (_is_key_column(slot->col_name()))) { + if (pdt == PushDownType::ACCEPTABLE && + (_is_key_column(slot->col_name()) || _should_push_down_mor_value_predicate())) { output_expr = nullptr; return Status::OK(); } else { diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index f3deca4b55e..71ca28683a5 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -205,6 +205,7 @@ protected: virtual bool _storage_no_merge() { return false; } virtual bool _push_down_topn(const vectorized::RuntimePredicate& predicate) { return false; } virtual bool _is_key_column(const std::string& col_name) { return false; } + virtual bool _should_push_down_mor_value_predicate() { return false; } virtual PushDownType _should_push_down_bloom_filter() const { return PushDownType::UNACCEPTABLE; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index bce593597ce..7f9d3938800 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -3021,6 +3021,14 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc return getKeysType() == KeysType.UNIQUE_KEYS && getEnableUniqueKeyMergeOnWrite(); } + /** + * Check if this is a MOR (Merge-On-Read) table. + * MOR = UNIQUE_KEYS without merge-on-write enabled. + */ + public boolean isMorTable() { + return getKeysType() == KeysType.UNIQUE_KEYS && !getEnableUniqueKeyMergeOnWrite(); + } + public boolean isUniqKeyMergeOnWriteWithClusterKeys() { return isUniqKeyMergeOnWrite() && getBaseSchema().stream().anyMatch(Column::isClusterKey); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index d5fddd65c38..c982775db9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1181,6 +1181,15 @@ public class OlapScanNode extends ScanNode { msg.olap_scan_node.setTableName(tableName); msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite()); + // Set MOR value predicate pushdown flag based on session variable + if (olapTable.isMorTable() && ConnectContext.get() != null) { + String dbName = olapTable.getQualifiedDbName(); + String tblName = olapTable.getName(); + boolean enabled = ConnectContext.get().getSessionVariable() + .isMorValuePredicatePushdownEnabled(dbName, tblName); + msg.olap_scan_node.setEnable_mor_value_predicate_pushdown(enabled); + } + msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 025720b0670..3043f4d622f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -722,6 +722,9 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_PUSHDOWN_STRING_MINMAX = "enable_pushdown_string_minmax"; + public static final String ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES + = "enable_mor_value_predicate_pushdown_tables"; + // When set use fix replica = true, the fixed replica maybe bad, try to use the health one if // this session variable is set to true. public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt"; @@ -2174,6 +2177,13 @@ public class SessionVariable implements Serializable, Writable { "是否启用 string 类型 min max 下推。", "Set whether to enable push down string type minmax."}) public boolean enablePushDownStringMinMax = false; + // Comma-separated list of MOR tables to enable value predicate pushdown. + @VariableMgr.VarAttr(name = ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES, needForward = true, description = { + "指定启用MOR表value列谓词下推的表列表,格式:db1.tbl1,db2.tbl2 或 * 表示所有MOR表。", + "Comma-separated list of MOR tables to enable value predicate pushdown. " + + "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."}) + public String enableMorValuePredicatePushdownTables = ""; + // Whether drop table when create table as select insert data appear error. @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true) public boolean dropTableIfCtasFailed = true; @@ -4652,6 +4662,35 @@ public class SessionVariable implements Serializable, Writable { return enablePushDownStringMinMax; } + public String getEnableMorValuePredicatePushdownTables() { + return enableMorValuePredicatePushdownTables; + } + + /** + * Check if a table is enabled for MOR value predicate pushdown. + * @param dbName database name + * @param tableName table name + * @return true if the table is in the enabled list or if '*' is set + */ + public boolean isMorValuePredicatePushdownEnabled(String dbName, String tableName) { + if (enableMorValuePredicatePushdownTables == null + || enableMorValuePredicatePushdownTables.isEmpty()) { + return false; + } + String trimmed = enableMorValuePredicatePushdownTables.trim(); + if ("*".equals(trimmed)) { + return true; + } + String fullName = dbName + "." + tableName; + for (String table : trimmed.split(",")) { + if (table.trim().equalsIgnoreCase(fullName) + || table.trim().equalsIgnoreCase(tableName)) { + return true; + } + } + return false; + } + /** canUseNereidsDistributePlanner */ public static boolean canUseNereidsDistributePlanner() { ConnectContext connectContext = ConnectContext.get(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 1369c6e1045..50ec842642a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -887,6 +887,8 @@ struct TOlapScanNode { 20: optional i64 score_sort_limit 21: optional TSortInfo ann_sort_info 22: optional i64 ann_sort_limit + // Enable value predicate pushdown for MOR tables + 23: optional bool enable_mor_value_predicate_pushdown } struct TEqJoinCondition { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
