This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 84f30208b7 [multistage] Introducing dynamic filtering semi-join broker
level configuration (#15402)
84f30208b7 is described below
commit 84f30208b700913c44453312b4da5202c0134955
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Wed Apr 2 12:00:44 2025 -0700
[multistage] Introducing dynamic filtering semi-join broker level
configuration (#15402)
---
.../MultiStageBrokerRequestHandler.java | 4 ++++
.../org/apache/pinot/query/QueryEnvironment.java | 23 +++++++++++++++++++---
.../apache/pinot/spi/utils/CommonConstants.java | 3 +++
3 files changed, 27 insertions(+), 3 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 2dedf7b244..44e74ccc70 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -311,6 +311,9 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
CommonConstants.Broker.DEFAULT_OF_SPOOLS);
boolean defaultEnableGroupTrim =
_config.getProperty(CommonConstants.Broker.CONFIG_OF_MSE_ENABLE_GROUP_TRIM,
CommonConstants.Broker.DEFAULT_MSE_ENABLE_GROUP_TRIM);
+ boolean defaultEnableDynamicFilteringSemiJoin = _config.getProperty(
+
CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN,
+ CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN);
return QueryEnvironment.configBuilder()
.database(database)
.tableCache(_tableCache)
@@ -318,6 +321,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
.defaultInferPartitionHint(inferPartitionHint)
.defaultUseSpools(defaultUseSpool)
.defaultEnableGroupTrim(defaultEnableGroupTrim)
+
.defaultEnableDynamicFilteringSemiJoin(defaultEnableDynamicFilteringSemiJoin)
.build();
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 1cec44a987..1e211ec64d 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -55,6 +55,7 @@ import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.pinot.calcite.rel.rules.PinotImplicitTableHintRule;
+import org.apache.pinot.calcite.rel.rules.PinotJoinToDynamicBroadcastRule;
import org.apache.pinot.calcite.rel.rules.PinotQueryRuleSets;
import org.apache.pinot.calcite.rel.rules.PinotRelDistributionTraitRule;
import org.apache.pinot.calcite.rel.rules.PinotRuleUtils;
@@ -149,7 +150,7 @@ public class QueryEnvironment {
*/
private PlannerContext getPlannerContext(SqlNodeAndOptions
sqlNodeAndOptions) {
WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
- HepProgram traitProgram = getTraitProgram(workerManager);
+ HepProgram traitProgram = getTraitProgram(workerManager, _envConfig);
SqlExplainFormat format = SqlExplainFormat.DOT;
if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) {
SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
@@ -465,7 +466,7 @@ public class QueryEnvironment {
return hepProgramBuilder.build();
}
- private static HepProgram getTraitProgram(@Nullable WorkerManager
workerManager) {
+ private static HepProgram getTraitProgram(@Nullable WorkerManager
workerManager, Config config) {
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
// Set the match order as BOTTOM_UP.
@@ -474,7 +475,9 @@ public class QueryEnvironment {
// ----
// Run pinot specific rules that should run after all other rules, using 1
HepInstruction per rule.
for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) {
- hepProgramBuilder.addRuleInstance(relOptRule);
+ if (isEligibleQueryPostRule(relOptRule, config)) {
+ hepProgramBuilder.addRuleInstance(relOptRule);
+ }
}
// apply RelDistribution trait to all nodes
@@ -486,6 +489,14 @@ public class QueryEnvironment {
return hepProgramBuilder.build();
}
+ // This method is used to filter out post rules that are not eligible to run
based on the config.
+ private static boolean isEligibleQueryPostRule(RelOptRule relOptRule, Config
config) {
+ if (relOptRule instanceof PinotJoinToDynamicBroadcastRule &&
!config.defaultEnableDynamicFilteringSemiJoin()) {
+ return false;
+ }
+ return true;
+ }
+
public static ImmutableQueryEnvironment.Config.Builder configBuilder() {
return ImmutableQueryEnvironment.Config.builder();
}
@@ -532,11 +543,17 @@ public class QueryEnvironment {
return CommonConstants.Broker.DEFAULT_OF_SPOOLS;
}
+
@Value.Default
default boolean defaultEnableGroupTrim() {
return CommonConstants.Broker.DEFAULT_MSE_ENABLE_GROUP_TRIM;
}
+ @Value.Default
+ default boolean defaultEnableDynamicFilteringSemiJoin() {
+ return CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN;
+ }
+
/**
* Returns the worker manager.
*
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index a5a4b60bff..5f6b186f70 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -456,6 +456,9 @@ public class CommonConstants {
public static final String
CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC =
"pinot.broker.enable.multistage.migration.metric";
public static final boolean DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC =
false;
+ public static final String
CONFIG_OF_BROKER_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN =
+ "pinot.broker.enable.dynamic.filtering.semijoin";
+ public static final boolean DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN =
true;
public static class Request {
public static final String SQL = "sql";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]