xiangfu0 commented on code in PR #18848: URL: https://github.com/apache/pinot/pull/18848#discussion_r3491640238
########## pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToInnerRuntimeFilterRule.java: ########## @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.calcite.rel.rules; + +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelDistributions; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.pinot.calcite.rel.hint.PinotHintOptions; +import org.apache.pinot.calcite.rel.hint.PinotHintOptions.RuntimeFilterMode; +import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; +import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType; +import org.apache.pinot.calcite.rel.logical.RuntimeFilterRel; +import org.apache.pinot.calcite.rel.logical.RuntimeFilterRel.RuntimeFilterType; +import org.apache.pinot.spi.utils.CommonConstants; + + +/** + * Special Pinot rule that adds an additive probe-side runtime filter to an equi-INNER-JOIN. + * + * <p>This is the INNER-join counterpart of {@link PinotJoinToDynamicBroadcastRule}. The SEMI rule + * <em>replaces</em> the join with a leaf {@code IN} filter (sound only because a semi-join emits left + * columns only). An inner join projects <em>both</em> sides, so the join must keep running as a real + * intermediate-stage hash join. This rule therefore keeps the join and both of its exchanges intact + * and only <em>adds</em> a {@link RuntimeFilterRel} on top of the probe (left) leaf subtree, carrying a + * {@code PIPELINE_BREAKER} exchange of the build-side join keys. At runtime the probe leaf + * scan ANDs in a no-false-negative reducer (exact {@code IN} and/or bloom) built from those keys, + * dropping probe rows that cannot match before they are shuffled into the join. + * + * <p>Before (after exchange insertion): + * <pre> + * [ Inner Join ] + * / \ + * [xChange L] [xChange R] + * / \ + * [probe leaf] [build subtree] + * </pre> + * After: + * <pre> + * [ Inner Join ] (unchanged — still shuffles both sides) + * / \ + * [xChange L] [xChange R] + * / \ + * [RuntimeFilter] [build subtree] + * / \ + * [probe leaf] [PIPELINE_BREAKER xChange] + * | + * [build keys: Project(rightKeys) -> Filter(notNull) -> limit(maxBuildRows + 1)] + * </pre> + * + * <p>Disabled by default; enabled per-cluster/query (then defaulting to {@code AUTO}) or per-join via + * the {@code runtime_filter} join hint. Restricted to a leaf-pushable probe (TableScan with optional + * single-in single-out Project/Filter). Multi-key joins use an exact IN per key; the bloom tier is + * single-key only. + */ +public class PinotJoinToInnerRuntimeFilterRule extends RelOptRule { + /** + * Placeholder instance registered in {@code PinotQueryRuleSets#POST_LOGICAL_RULES} to fix this rule's + * position in the post-logical sequence (right after the SEMI dynamic-broadcast rule). + * {@code QueryEnvironment#getTraitProgram} swaps it for a per-query instance carrying the resolved + * enable flag, because a Calcite rule cannot read query options at match time so the cluster/query-level + * enable state must be injected via the constructor. + */ + public static final PinotJoinToInnerRuntimeFilterRule INSTANCE = + new PinotJoinToInnerRuntimeFilterRule(PinotRuleUtils.PINOT_REL_FACTORY, false); + + private final boolean _queryLevelEnabled; + + public PinotJoinToInnerRuntimeFilterRule(RelBuilderFactory factory, boolean queryLevelEnabled) { + super(operand(Join.class, any()), factory, null); + _queryLevelEnabled = queryLevelEnabled; + } + + @Override + public boolean matches(RelOptRuleCall call) { + Join join = call.rel(0); + + // Resolve enablement: an explicit hint wins; otherwise fall back to the cluster/query-level default. + RuntimeFilterMode hintMode = PinotHintOptions.JoinHintOptions.getRuntimeFilterMode(join); + boolean enabled = hintMode != null ? hintMode != RuntimeFilterMode.OFF : _queryLevelEnabled; + if (!enabled) { + return false; + } + + // Lookup joins keep the right table local; a runtime filter does not apply. + if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) { + return false; + } + + // Only equi-INNER joins with at least one equi-key (non-equi conditions are allowed — the join still + // runs, we only reduce the probe by the equi-keys). Multi-key uses an exact IN per key, which is a + // sound reducer: a matching probe row equals some build tuple, so it passes every per-key IN; the Review Comment: The multi-key runtime filter is not actually no-false-negative for FLOAT/DOUBLE join keys. Composite MSE join keys use `Key.equals`/`Arrays.equals`, which canonicalizes NaNs, but the leaf `IN` path uses fastutil float/double sets, which distinguish NaN payloads. That means `(nanPayloadA, k)` on the build side and `(nanPayloadB, k)` on the probe side can join, but this reducer drops the probe row first. Please either disable multi-key runtime filters for FLOAT/DOUBLE joins or canonicalize the exact-IN semantics to match join-key equality, and add a regression with distinct NaN payloads in a composite key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
