This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0a56c87e938 SQL: Plan non-equijoin conditions as cross join followed
by filter (#15302)
0a56c87e938 is described below
commit 0a56c87e93898bde2d0fb4ffafdbd793834632da
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Wed Nov 29 13:46:11 2023 +0530
SQL: Plan non-equijoin conditions as cross join followed by filter (#15302)
This PR revives #14978 with a few more bells and whistles. Instead of an
unconditional cross-join, we will now split the join condition such that some
conditions are now evaluated post-join. To decide what sub-condition goes
where, I have refactored DruidJoinRule class to extract unsupported
sub-conditions. We build a postJoinFilter out of these unsupported
sub-conditions and push to the join.
---
docs/querying/datasource.md | 38 ++-
.../org/apache/druid/msq/exec/MSQSelectTest.java | 2 +-
.../druid/msq/test/CalciteSelectQueryMSQTest.java | 9 +-
.../druid/sql/calcite/rule/DruidJoinRule.java | 355 ++++++++++++---------
.../druid/sql/calcite/CalciteJoinQueryTest.java | 135 +++++++-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 27 +-
.../apache/druid/sql/calcite/NotYetSupported.java | 22 +-
.../druid/sql/calcite/rule/DruidJoinRuleTest.java | 74 ++++-
website/.spelling | 2 +
9 files changed, 475 insertions(+), 189 deletions(-)
diff --git a/docs/querying/datasource.md b/docs/querying/datasource.md
index 5853f1e6dfc..5a1118deff7 100644
--- a/docs/querying/datasource.md
+++ b/docs/querying/datasource.md
@@ -320,12 +320,16 @@ Join datasources allow you to do a SQL-style join of two
datasources. Stacking j
you to join arbitrarily many datasources.
In Druid {{DRUIDVERSION}}, joins in native queries are implemented with a
broadcast hash-join algorithm. This means
-that all datasources other than the leftmost "base" datasource must fit in
memory. It also means that the join condition
-must be an equality. This feature is intended mainly to allow joining regular
Druid tables with [lookup](#lookup),
-[inline](#inline), and [query](#query) datasources.
+that all datasources other than the leftmost "base" datasource must fit in
memory. In native queries, the join condition
+must be an equality. In SQL, any join condition is accepted, but only
equalities of a certain form
+(see [Joins in SQL](#joins-in-sql)) execute efficiently as part of a native
join. For other kinds of conditions, planner will try
+to re-arrange condition such that some of the sub-conditions are evaluated as
a filter on top of join and other
+sub-conditions are left out in the join condition. In worst case scenario, SQL
will execute the join condition as a
+cross join (cartesian product) plus a filter.
-Refer to the [Query execution](query-execution.md#join) page for more details
on how queries are executed when you
-use join datasources.
+This feature is intended mainly to allow joining regular Druid tables with
[lookup](#lookup), [inline](#inline), and
+[query](#query) datasources. Refer to the [Query
execution](query-execution.md#join) page for more details on how
+queries are executed when you use join datasources.
#### Joins in SQL
@@ -335,21 +339,23 @@ SQL joins take the form:
<o1> [ INNER | LEFT [OUTER] ] JOIN <o2> ON <condition>
```
-The condition must involve only equalities, but functions are okay, and there
can be multiple equalities ANDed together.
-Conditions like `t1.x = t2.x`, or `LOWER(t1.x) = t2.x`, or `t1.x = t2.x AND
t1.y = t2.y` can all be handled. Conditions
-like `t1.x <> t2.x` cannot currently be handled.
+Any condition is accepted, but only certain kinds of conditions execute
efficiently
+as a native join. The condition must be a single clause like the following, or
an `AND` of clauses involving at
+least one of the following:
-Note that Druid SQL is less rigid than what native join datasources can
handle. In cases where a SQL query does
-something that is not allowed as-is with a native join datasource, Druid SQL
will generate a subquery. This can have
-a substantial effect on performance and scalability, so it is something to
watch out for. Some examples of when the
-SQL layer will generate subqueries include:
+- Equality between fields of the same type on each side, like `t1 JOIN t2 ON
t1.x = t2.x`.
+- Equality between a function call on one side, and a field on the other side,
like `t1 JOIN t2 ON LOWER(t1.x) = t2.x`.
+- The equality operator may be `=` (which does not match nulls) or `IS NOT
DISTINCT FROM` (which does match nulls).
-- Joining a regular Druid table to itself, or to another regular Druid table.
The native join datasource can accept
-a table on the left-hand side, but not the right, so a subquery is needed.
+In other cases, Druid will either insert a subquery below the join, or will
use a cross join (cartesian product)
+followed by a filter. Joins executed in these ways may run into resource or
performance constraints. To determine
+if your query is using one of these execution paths, run `EXPLAIN PLAN FOR
<query>` and look for the following:
-- Join conditions where the expressions on either side are of different types.
+- `query` type datasources under the `left` or `right` key of your `join`
datasource.
+- `join` type datasource with `condition` set to `"1"` (cartesian product)
followed by a `filter` that encodes the
+ condition you provided.
-- Join conditions where the right-hand expression is not a direct column
access.
+In these cases, you may be able to improve the performance of your query by
rewriting it.
For more information about how Druid translates SQL to native queries, refer
to the
[Druid SQL](sql-translation.md) documentation.
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index e1c671a8665..c0dfe0f77f3 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -772,7 +772,7 @@ public class MSQSelectTest extends MSQTestBase
DruidExpression.ofColumn(ColumnType.STRING, "dim2"),
DruidExpression.ofColumn(ColumnType.STRING, "j0.k")
),
- JoinType.LEFT
+ NullHandling.sqlCompatible() ?
JoinType.INNER : JoinType.LEFT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
index c83ec91f454..974eed48734 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
@@ -145,7 +145,14 @@ public class CalciteSelectQueryMSQTest extends
CalciteQueryTest
@Ignore
@Override
- public void testUnplannableQueries()
+ public void testUnplannableScanOrderByNonTime()
+ {
+
+ }
+
+ @Ignore
+ @Override
+ public void testUnplannableJoinQueriesInNonSQLCompatibleMode()
{
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java
index 6dc8ff00531..35e3e6eca80 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java
@@ -28,8 +28,10 @@ import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
@@ -43,6 +45,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@@ -54,7 +57,6 @@ import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
@@ -82,7 +84,7 @@ public class DruidJoinRule extends RelOptRule
{
return new DruidJoinRule(plannerContext);
}
-
+
@Override
public boolean matches(RelOptRuleCall call)
{
@@ -93,9 +95,14 @@ public class DruidJoinRule extends RelOptRule
// 1) Can handle the join condition as a native join.
// 2) Left has a PartialDruidQuery (i.e., is a real query, not top-level
UNION ALL).
// 3) Right has a PartialDruidQuery (i.e., is a real query, not top-level
UNION ALL).
- return canHandleCondition(join.getCondition(),
join.getLeft().getRowType(), right, join.getCluster().getRexBuilder())
- && left.getPartialDruidQuery() != null
- && right.getPartialDruidQuery() != null;
+ return canHandleCondition(
+ join.getCondition(),
+ join.getLeft().getRowType(),
+ right,
+ join.getJoinType(),
+ join.getSystemFieldList(),
+ join.getCluster().getRexBuilder()
+ ) && left.getPartialDruidQuery() != null && right.getPartialDruidQuery()
!= null;
}
@Override
@@ -112,14 +119,12 @@ public class DruidJoinRule extends RelOptRule
final Filter leftFilter;
final List<RexNode> newProjectExprs = new ArrayList<>();
- // Already verified to be present in "matches", so just call "get".
// Can't be final, because we're going to reassign it up to a couple of
times.
ConditionAnalysis conditionAnalysis = analyzeCondition(
join.getCondition(),
join.getLeft().getRowType(),
- right,
rexBuilder
- ).get();
+ );
final boolean isLeftDirectAccessPossible = enableLeftScanDirect && (left
instanceof DruidQueryRel);
if (!plannerContext.getJoinAlgorithm().requiresSubquery()
@@ -184,7 +189,7 @@ public class DruidJoinRule extends RelOptRule
final DruidJoinQueryRel druidJoin = DruidJoinQueryRel.create(
join.copy(
join.getTraitSet(),
- conditionAnalysis.getCondition(rexBuilder),
+
conditionAnalysis.getConditionWithUnsupportedSubConditionsIgnored(rexBuilder),
newLeft,
newRight,
join.getJoinType(),
@@ -194,7 +199,7 @@ public class DruidJoinRule extends RelOptRule
left.getPlannerContext()
);
- final RelBuilder relBuilder =
+ RelBuilder relBuilder =
call.builder()
.push(druidJoin)
.project(
@@ -205,6 +210,12 @@ public class DruidJoinRule extends RelOptRule
)
);
+ // Build a post-join filter with whatever join sub-conditions were not
supported.
+ RexNode postJoinFilter = RexUtil.composeConjunction(rexBuilder,
conditionAnalysis.getUnsupportedOnSubConditions(), true);
+ if (postJoinFilter != null) {
+ relBuilder = relBuilder.filter(postJoinFilter);
+ }
+
call.transformTo(relBuilder.build());
}
@@ -222,100 +233,20 @@ public class DruidJoinRule extends RelOptRule
}
/**
- * Returns whether {@link #analyzeCondition} would return something.
+ * Returns whether we can handle the join condition. In case, some
conditions in an AND expression are not supported,
+ * they are extracted into a post-join filter instead.
*/
@VisibleForTesting
- boolean canHandleCondition(final RexNode condition, final RelDataType
leftRowType, DruidRel<?> right, final RexBuilder rexBuilder)
- {
- return analyzeCondition(condition, leftRowType, right,
rexBuilder).isPresent();
- }
-
- /**
- * If this condition is an AND of some combination of (1) literals; (2)
equality conditions of the form
- * {@code f(LeftRel) = RightColumn}, then return a {@link ConditionAnalysis}.
- */
- private Optional<ConditionAnalysis> analyzeCondition(
+ public boolean canHandleCondition(
final RexNode condition,
final RelDataType leftRowType,
- final DruidRel<?> right,
+ DruidRel<?> right,
+ JoinRelType joinType,
+ List<RelDataTypeField> systemFieldList,
final RexBuilder rexBuilder
)
{
- final List<RexNode> subConditions = decomposeAnd(condition);
- final List<RexEquality> equalitySubConditions = new ArrayList<>();
- final List<RexLiteral> literalSubConditions = new ArrayList<>();
- final int numLeftFields = leftRowType.getFieldCount();
- final Set<RexInputRef> rightColumns = new HashSet<>();
-
- for (RexNode subCondition : subConditions) {
- if (RexUtil.isLiteral(subCondition, true)) {
- if (subCondition.isA(SqlKind.CAST)) {
- // This is CAST(literal) which is always OK.
- // We know that this is CAST(literal) as it passed the check from
RexUtil.isLiteral
- RexCall call = (RexCall) subCondition;
- // We have to verify the types of the cast here, because if the
underlying literal and the cast output type
- // are different, then skipping the cast might change the meaning of
the subcondition.
- if
(call.getType().getSqlTypeName().equals(call.getOperands().get(0).getType().getSqlTypeName()))
{
- // If the types are the same, unwrap the cast and use the
underlying literal.
- literalSubConditions.add((RexLiteral) call.getOperands().get(0));
- } else {
- // If the types are not the same, return Optional.empty()
indicating the condition is not supported.
- return Optional.empty();
- }
- } else {
- // Literals are always OK.
- literalSubConditions.add((RexLiteral) subCondition);
- }
- continue;
- }
-
- RexNode firstOperand;
- RexNode secondOperand;
- SqlKind comparisonKind;
-
- if (subCondition.isA(SqlKind.INPUT_REF)) {
- firstOperand = rexBuilder.makeLiteral(true);
- secondOperand = subCondition;
- comparisonKind = SqlKind.EQUALS;
-
- if
(!SqlTypeName.BOOLEAN_TYPES.contains(secondOperand.getType().getSqlTypeName()))
{
- plannerContext.setPlanningError(
- "SQL requires a join with '%s' condition where the column is of
the type %s, that is not supported",
- subCondition.getKind(),
- secondOperand.getType().getSqlTypeName()
- );
- return Optional.empty();
-
- }
- } else if (subCondition.isA(SqlKind.EQUALS) ||
subCondition.isA(SqlKind.IS_NOT_DISTINCT_FROM)) {
- final List<RexNode> operands = ((RexCall) subCondition).getOperands();
- Preconditions.checkState(operands.size() == 2, "Expected 2 operands,
got[%s]", operands.size());
- firstOperand = operands.get(0);
- secondOperand = operands.get(1);
- comparisonKind = subCondition.getKind();
- } else {
- // If it's not EQUALS or a BOOLEAN input ref, it's not supported.
- plannerContext.setPlanningError(
- "SQL requires a join with '%s' condition that is not supported.",
- subCondition.getKind()
- );
- return Optional.empty();
- }
-
- if (isLeftExpression(firstOperand, numLeftFields) &&
isRightInputRef(secondOperand, numLeftFields)) {
- equalitySubConditions.add(new RexEquality(firstOperand, (RexInputRef)
secondOperand, comparisonKind));
- rightColumns.add((RexInputRef) secondOperand);
- } else if (isRightInputRef(firstOperand, numLeftFields)
- && isLeftExpression(secondOperand, numLeftFields)) {
- equalitySubConditions.add(new RexEquality(secondOperand, (RexInputRef)
firstOperand, subCondition.getKind()));
- rightColumns.add((RexInputRef) firstOperand);
- } else {
- // Cannot handle this condition.
- plannerContext.setPlanningError("SQL is resulting in a join that has
unsupported operand types.");
- return Optional.empty();
- }
- }
-
+ ConditionAnalysis conditionAnalysis = analyzeCondition(condition,
leftRowType, rexBuilder);
// if the right side requires a subquery, then even lookup will be
transformed to a QueryDataSource
// thereby allowing join conditions on both k and v columns of the lookup
if (right != null
@@ -323,63 +254,32 @@ public class DruidJoinRule extends RelOptRule
&& right instanceof DruidQueryRel) {
DruidQueryRel druidQueryRel = (DruidQueryRel) right;
if (druidQueryRel.getDruidTable().getDataSource() instanceof
LookupDataSource) {
- long distinctRightColumns =
rightColumns.stream().map(RexSlot::getIndex).distinct().count();
+ long distinctRightColumns =
conditionAnalysis.rightColumns.stream().map(RexSlot::getIndex).distinct().count();
if (distinctRightColumns > 1) {
// it means that the join's right side is lookup and the join
condition contains both key and value columns of lookup.
// currently, the lookup datasource in the native engine doesn't
support using value column in the join condition.
plannerContext.setPlanningError(
"SQL is resulting in a join involving lookup where value column
is used in the condition.");
- return Optional.empty();
+ return false;
}
}
}
- return Optional.of(
- new ConditionAnalysis(
- numLeftFields,
- equalitySubConditions,
- literalSubConditions
- )
- );
- }
+ if (joinType != JoinRelType.INNER || !systemFieldList.isEmpty() ||
NullHandling.replaceWithDefault()) {
+ // I am not sure in what case, the list of system fields will be not
empty. I have just picked up this logic
+ // directly from
https://github.com/apache/calcite/blob/calcite-1.35.0/core/src/main/java/org/apache/calcite/rel/rules/AbstractJoinExtractFilterRule.java#L58
- @VisibleForTesting
- static List<RexNode> decomposeAnd(final RexNode condition)
- {
- final List<RexNode> retVal = new ArrayList<>();
- final Stack<RexNode> stack = new Stack<>();
-
- stack.push(condition);
-
- while (!stack.empty()) {
- final RexNode current = stack.pop();
-
- if (current.isA(SqlKind.AND)) {
- final List<RexNode> operands = ((RexCall) current).getOperands();
-
- // Add right-to-left, so when we unwind the stack, the operands are in
the original order.
- for (int i = operands.size() - 1; i >= 0; i--) {
- stack.push(operands.get(i));
- }
- } else {
- retVal.add(current);
- }
+ // Also to avoid results changes for existing queries in non-null
handling mode, we don't handle unsupported
+ // conditions. Otherwise, some left/right joins with a condition that
doesn't allow nulls on join input will
+ // be converted to inner joins. See Test
CalciteJoinQueryTest#testFilterAndGroupByLookupUsingJoinOperatorBackwards
+ // for an example.
+ return conditionAnalysis.getUnsupportedOnSubConditions().isEmpty();
}
-
- return retVal;
- }
-
- private boolean isLeftExpression(final RexNode rexNode, final int
numLeftFields)
- {
- return
ImmutableBitSet.range(numLeftFields).contains(RelOptUtil.InputFinder.bits(rexNode));
+
+ return true;
}
- private static boolean isRightInputRef(final RexNode rexNode, final int
numLeftFields)
- {
- return rexNode.isA(SqlKind.INPUT_REF) && ((RexInputRef)
rexNode).getIndex() >= numLeftFields;
- }
-
- static class ConditionAnalysis
+ public static class ConditionAnalysis
{
/**
* Number of fields on the left-hand side. Useful for identifying if a
particular field is from on the left
@@ -397,15 +297,26 @@ public class DruidJoinRule extends RelOptRule
*/
private final List<RexLiteral> literalSubConditions;
+ /**
+ * Sub-conditions in join clause that cannot be handled by the
DruidJoinRule.
+ */
+ private final List<RexNode> unsupportedOnSubConditions;
+
+ private final Set<RexInputRef> rightColumns;
+
ConditionAnalysis(
int numLeftFields,
List<RexEquality> equalitySubConditions,
- List<RexLiteral> literalSubConditions
+ List<RexLiteral> literalSubConditions,
+ List<RexNode> unsupportedOnSubConditions,
+ Set<RexInputRef> rightColumns
)
{
this.numLeftFields = numLeftFields;
this.equalitySubConditions = equalitySubConditions;
this.literalSubConditions = literalSubConditions;
+ this.unsupportedOnSubConditions = unsupportedOnSubConditions;
+ this.rightColumns = rightColumns;
}
public ConditionAnalysis pushThroughLeftProject(final Project leftProject)
@@ -414,6 +325,7 @@ public class DruidJoinRule extends RelOptRule
final int rhsShift =
leftProject.getInput().getRowType().getFieldCount() -
leftProject.getRowType().getFieldCount();
+ // We leave unsupportedSubConditions un-touched as they are evaluated
above join anyway.
return new ConditionAnalysis(
leftProject.getInput().getRowType().getFieldCount(),
equalitySubConditions
@@ -426,7 +338,9 @@ public class DruidJoinRule extends RelOptRule
)
)
.collect(Collectors.toList()),
- literalSubConditions
+ literalSubConditions,
+ unsupportedOnSubConditions,
+ rightColumns
);
}
@@ -434,6 +348,7 @@ public class DruidJoinRule extends RelOptRule
{
Preconditions.checkArgument(onlyUsesMappingsFromRightProject(rightProject),
"Cannot push through");
+ // We leave unsupportedSubConditions un-touched as they are evaluated
above join anyway.
return new ConditionAnalysis(
numLeftFields,
equalitySubConditions
@@ -452,7 +367,9 @@ public class DruidJoinRule extends RelOptRule
)
)
.collect(Collectors.toList()),
- literalSubConditions
+ literalSubConditions,
+ unsupportedOnSubConditions,
+ rightColumns
);
}
@@ -469,7 +386,7 @@ public class DruidJoinRule extends RelOptRule
return true;
}
- public RexNode getCondition(final RexBuilder rexBuilder)
+ public RexNode getConditionWithUnsupportedSubConditionsIgnored(final
RexBuilder rexBuilder)
{
return RexUtil.composeConjunction(
rexBuilder,
@@ -484,6 +401,11 @@ public class DruidJoinRule extends RelOptRule
);
}
+ public List<RexNode> getUnsupportedOnSubConditions()
+ {
+ return unsupportedOnSubConditions;
+ }
+
@Override
public String toString()
{
@@ -491,10 +413,153 @@ public class DruidJoinRule extends RelOptRule
"numLeftFields=" + numLeftFields +
", equalitySubConditions=" + equalitySubConditions +
", literalSubConditions=" + literalSubConditions +
+ ", unsupportedSubConditions=" + unsupportedOnSubConditions +
+ ", rightColumns=" + rightColumns +
'}';
}
}
+ /**
+ * If this condition is an AND of some combination of
+ * (1) literals;
+ * (2) equality conditions of the form
+ * (3) unsupported conditions
+ * <p>
+ * Returns empty if the join cannot be supported at all. It can return
non-empty with some unsupported conditions
+ * that can be extracted into post join filter.
+ * {@code f(LeftRel) = RightColumn}, then return a {@link ConditionAnalysis}.
+ */
+ public ConditionAnalysis analyzeCondition(
+ final RexNode condition,
+ final RelDataType leftRowType,
+ final RexBuilder rexBuilder
+ )
+ {
+ final List<RexNode> subConditions = decomposeAnd(condition);
+ final List<RexEquality> equalitySubConditions = new ArrayList<>();
+ final List<RexLiteral> literalSubConditions = new ArrayList<>();
+ final List<RexNode> unSupportedSubConditions = new ArrayList<>();
+ final Set<RexInputRef> rightColumns = new HashSet<>();
+ final int numLeftFields = leftRowType.getFieldCount();
+
+ for (RexNode subCondition : subConditions) {
+ if (RexUtil.isLiteral(subCondition, true)) {
+ if (subCondition.isA(SqlKind.CAST)) {
+ // This is CAST(literal) which is always OK.
+ // We know that this is CAST(literal) as it passed the check from
RexUtil.isLiteral
+ RexCall call = (RexCall) subCondition;
+ // We have to verify the types of the cast here, because if the
underlying literal and the cast output type
+ // are different, then skipping the cast might change the meaning of
the subcondition.
+ if
(call.getType().getSqlTypeName().equals(call.getOperands().get(0).getType().getSqlTypeName()))
{
+ // If the types are the same, unwrap the cast and use the
underlying literal.
+ literalSubConditions.add((RexLiteral) call.getOperands().get(0));
+ } else {
+ // If the types are not the same, add to unsupported conditions.
+ unSupportedSubConditions.add(subCondition);
+ continue;
+ }
+ } else {
+ // Literals are always OK.
+ literalSubConditions.add((RexLiteral) subCondition);
+ }
+ continue;
+ }
+
+ RexNode firstOperand;
+ RexNode secondOperand;
+ SqlKind comparisonKind;
+
+ if (subCondition.isA(SqlKind.INPUT_REF)) {
+ firstOperand = rexBuilder.makeLiteral(true);
+ secondOperand = subCondition;
+ comparisonKind = SqlKind.EQUALS;
+
+ if
(!SqlTypeName.BOOLEAN_TYPES.contains(secondOperand.getType().getSqlTypeName()))
{
+ plannerContext.setPlanningError(
+ "SQL requires a join with '%s' condition where the column is of
the type %s, that is not supported",
+ subCondition.getKind(),
+ secondOperand.getType().getSqlTypeName()
+ );
+ unSupportedSubConditions.add(subCondition);
+ continue;
+
+ }
+ } else if (subCondition.isA(SqlKind.EQUALS) ||
subCondition.isA(SqlKind.IS_NOT_DISTINCT_FROM)) {
+ final List<RexNode> operands = ((RexCall) subCondition).getOperands();
+ Preconditions.checkState(operands.size() == 2, "Expected 2 operands,
got[%s]", operands.size());
+ firstOperand = operands.get(0);
+ secondOperand = operands.get(1);
+ comparisonKind = subCondition.getKind();
+ } else {
+ // If it's not EQUALS or a BOOLEAN input ref, it's not supported.
+ plannerContext.setPlanningError(
+ "SQL requires a join with '%s' condition that is not supported.",
+ subCondition.getKind()
+ );
+ unSupportedSubConditions.add(subCondition);
+ continue;
+ }
+
+ if (isLeftExpression(firstOperand, numLeftFields) &&
isRightInputRef(secondOperand, numLeftFields)) {
+ equalitySubConditions.add(new RexEquality(firstOperand, (RexInputRef)
secondOperand, comparisonKind));
+ rightColumns.add((RexInputRef) secondOperand);
+ } else if (isRightInputRef(firstOperand, numLeftFields)
+ && isLeftExpression(secondOperand, numLeftFields)) {
+ equalitySubConditions.add(new RexEquality(secondOperand, (RexInputRef)
firstOperand, subCondition.getKind()));
+ rightColumns.add((RexInputRef) firstOperand);
+ } else {
+ // Cannot handle this condition.
+ plannerContext.setPlanningError("SQL is resulting in a join that has
unsupported operand types.");
+ unSupportedSubConditions.add(subCondition);
+ }
+ }
+
+ return new ConditionAnalysis(
+ numLeftFields,
+ equalitySubConditions,
+ literalSubConditions,
+ unSupportedSubConditions,
+ rightColumns
+ );
+ }
+
+ @VisibleForTesting
+ static List<RexNode> decomposeAnd(final RexNode condition)
+ {
+ final List<RexNode> retVal = new ArrayList<>();
+ final Stack<RexNode> stack = new Stack<>();
+
+ stack.push(condition);
+
+ while (!stack.empty()) {
+ final RexNode current = stack.pop();
+
+ if (current.isA(SqlKind.AND)) {
+ final List<RexNode> operands = ((RexCall) current).getOperands();
+
+ // Add right-to-left, so when we unwind the stack, the operands are in
the original order.
+ for (int i = operands.size() - 1; i >= 0; i--) {
+ stack.push(operands.get(i));
+ }
+ } else {
+ retVal.add(current);
+ }
+ }
+
+ return retVal;
+ }
+
+ private static boolean isLeftExpression(final RexNode rexNode, final int
numLeftFields)
+ {
+ return
ImmutableBitSet.range(numLeftFields).contains(RelOptUtil.InputFinder.bits(rexNode));
+ }
+
+ private static boolean isRightInputRef(final RexNode rexNode, final int
numLeftFields)
+ {
+ return rexNode.isA(SqlKind.INPUT_REF) && ((RexInputRef)
rexNode).getIndex() >= numLeftFields;
+ }
+
+
/**
* Like {@link org.apache.druid.segment.join.Equality} but uses {@link
RexNode} instead of
* {@link org.apache.druid.math.expr.Expr}.
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
index ea74678ba53..a8e16de2675 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
@@ -723,7 +723,7 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
),
"j0.",
equalsCondition(makeColumnExpression("k"),
makeColumnExpression("j0.dim2")),
- JoinType.RIGHT
+ NullHandling.sqlCompatible() ? JoinType.INNER
: JoinType.RIGHT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
@@ -740,7 +740,6 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
new Object[]{"xabc", 1L}
)
: ImmutableList.of(
- new Object[]{null, 5L},
new Object[]{"xabc", 1L}
)
);
@@ -768,7 +767,7 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(makeColumnExpression("dim2"),
makeColumnExpression("j0.k")),
- JoinType.LEFT
+ NullHandling.sqlCompatible() ? JoinType.INNER
: JoinType.LEFT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
@@ -784,8 +783,7 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
new Object[]{NULL_STRING, 3L},
new Object[]{"xabc", 1L}
)
- :
- ImmutableList.of(
+ : ImmutableList.of(
new Object[]{"xabc", 1L}
)
);
@@ -821,7 +819,7 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(makeColumnExpression("dim2"),
makeColumnExpression("j0.k")),
- JoinType.LEFT
+ NullHandling.sqlCompatible() ? JoinType.INNER
: JoinType.LEFT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
@@ -1965,17 +1963,25 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
.build()
),
"_j0.",
- "1",
+ NullHandling.sqlCompatible() ?
+ equalsCondition(
+ DruidExpression.fromExpression("CAST(\"j0.k\",
'LONG')"),
+ DruidExpression.ofColumn(ColumnType.LONG,
"_j0.cnt")
+ )
+ : "1",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(new CountAggregatorFactory("a0"))
- .filters(and(
- expressionFilter("(\"cnt\" == CAST(\"j0.k\", 'LONG'))"),
- expressionFilter("(CAST(\"j0.k\", 'LONG') ==
\"_j0.cnt\")")
- ))
+ .filters(
+ NullHandling.sqlCompatible() ?
+ expressionFilter("(\"cnt\" == CAST(\"j0.k\", 'LONG'))")
+ : and(
+
expressionFilter("(\"cnt\" == CAST(\"j0.k\", 'LONG'))"),
+
expressionFilter("(CAST(\"j0.k\", 'LONG') == \"_j0.cnt\")")
+ ))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@@ -4558,6 +4564,113 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
);
}
+ @Test
+ @Parameters(source = QueryContextForJoinProvider.class)
+ public void testJoinWithNonEquiCondition(Map<String, Object> queryContext)
+ {
+ // Native JOIN operator cannot handle the condition, so a SQL JOIN with
greater-than is translated into a
+ // cross join with a filter.
+ cannotVectorize();
+
+ // We don't handle non-equi join conditions for non-sql compatible mode.
+ Assume.assumeFalse(NullHandling.replaceWithDefault());
+
+ testQuery(
+ "SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 > y.m1",
+ queryContext,
+ ImmutableList.of(
+ newScanQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new QueryDataSource(
+ newScanQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+
.intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("m1")
+ .context(queryContext)
+ .build()
+ ),
+ "j0.",
+ "1",
+ JoinType.INNER
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .filters(expressionFilter("(\"m1\" > \"j0.m1\")"))
+ .columns("j0.m1", "m1")
+ .context(queryContext)
+ .build()
+ ),
+ sortIfSortBased(
+ ImmutableList.of(
+ new Object[]{2.0f, 1.0f},
+ new Object[]{3.0f, 1.0f},
+ new Object[]{3.0f, 2.0f},
+ new Object[]{4.0f, 1.0f},
+ new Object[]{4.0f, 2.0f},
+ new Object[]{4.0f, 3.0f},
+ new Object[]{5.0f, 1.0f},
+ new Object[]{5.0f, 2.0f},
+ new Object[]{5.0f, 3.0f},
+ new Object[]{5.0f, 4.0f},
+ new Object[]{6.0f, 1.0f},
+ new Object[]{6.0f, 2.0f},
+ new Object[]{6.0f, 3.0f},
+ new Object[]{6.0f, 4.0f},
+ new Object[]{6.0f, 5.0f}
+ ),
+ 1,
+ 0
+ )
+ );
+ }
+
+ @Test
+ @Parameters(source = QueryContextForJoinProvider.class)
+ public void testJoinWithEquiAndNonEquiCondition(Map<String, Object>
queryContext)
+ {
+ // Native JOIN operator cannot handle the condition, so a SQL JOIN with
greater-than is translated into a
+ // cross join with a filter.
+ cannotVectorize();
+
+ // We don't handle non-equi join conditions for non-sql compatible mode.
+ Assume.assumeFalse(NullHandling.replaceWithDefault());
+
+ testQuery(
+ "SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 = y.m1 AND x.m1
+ y.m1 = 6.0",
+ queryContext,
+ ImmutableList.of(
+ newScanQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new QueryDataSource(
+ newScanQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+
.intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("m1")
+ .context(queryContext)
+ .build()
+ ),
+ "j0.",
+ equalsCondition(makeColumnExpression("m1"),
makeColumnExpression("j0.m1")),
+ JoinType.INNER
+ )
+ )
+ .virtualColumns(expressionVirtualColumn("v0", "(\"m1\" +
\"j0.m1\")", ColumnType.DOUBLE))
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .filters(
+ equality("v0", 6.0, ColumnType.DOUBLE)
+ )
+ .columns("j0.m1", "m1")
+ .context(queryContext)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{3.0f, 3.0f})
+ );
+ }
+
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testUsingSubqueryAsPartOfAndFilter(Map<String, Object>
queryContext)
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index e03130a04f1..00ea933bb1e 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -126,6 +126,7 @@ import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -5414,32 +5415,40 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
- public void testUnplannableQueries()
+ public void testUnplannableScanOrderByNonTime()
{
msqIncompatible();
// All of these queries are unplannable because they rely on features
Druid doesn't support.
// This test is here to confirm that we don't fall back to Calcite's
interpreter or enumerable implementation.
// It's also here so when we do support these features, we can have "real"
tests for these queries.
- final Map<String, String> queries = ImmutableMap.of(
- // SELECT query with order by non-__time.
+ assertQueryIsUnplannable(
"SELECT dim1 FROM druid.foo ORDER BY dim1",
- "SQL query requires ordering a table by non-time column [[dim1]],
which is not supported.",
+ "SQL query requires ordering a table by non-time column [[dim1]],
which is not supported."
+ );
+ }
+
+ @NotYetSupported(Modes.ERROR_HANDLING)
+ @Test
+ public void testUnplannableJoinQueriesInNonSQLCompatibleMode()
+ {
+ msqIncompatible();
+
+ Assume.assumeFalse(NullHandling.sqlCompatible());
+ assertQueryIsUnplannable(
// JOIN condition with not-equals (<>).
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
+ "FROM foo INNER JOIN lookup.lookyloo l ON foo.dim2 <> l.k",
- "SQL requires a join with 'NOT_EQUALS' condition that is not
supported.",
+ "SQL requires a join with 'NOT_EQUALS' condition that is not
supported."
+ );
+ assertQueryIsUnplannable(
// JOIN condition with a function of both sides.
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
+ "FROM foo INNER JOIN lookup.lookyloo l ON CHARACTER_LENGTH(foo.dim2
|| l.k) > 3\n",
"SQL requires a join with 'GREATER_THAN' condition that is not
supported."
);
-
- for (final Map.Entry<String, String> queryErrorPair : queries.entrySet()) {
- assertQueryIsUnplannable(queryErrorPair.getKey(),
queryErrorPair.getValue());
- }
}
@Test
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
index c660e0cae21..a2372f3e0e5 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
@@ -129,11 +129,27 @@ public @interface NotYetSupported
public void evaluate()
{
Modes ignoreMode = annotation.value();
- Throwable e = assertThrows(
+ Throwable e = null;
+ try {
+ base.evaluate();
+ }
+ catch (Throwable t) {
+ e = t;
+ }
+ // If the base test case is supposed to be ignored already, just
skip the further evaluation
+ if (e instanceof AssumptionViolatedException) {
+ throw (AssumptionViolatedException) e;
+ }
+ Throwable finalE = e;
+ assertThrows(
"Expected that this testcase will fail - it might got fixed; or
failure have changed?",
ignoreMode.throwableClass,
- base::evaluate
- );
+ () -> {
+ if (finalE != null) {
+ throw finalE;
+ }
+ }
+ );
String trace = Throwables.getStackTraceAsString(e);
Matcher m = annotation.value().getPattern().matcher(trace);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java
index e531580162e..fa76ba3a3c9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite.rule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
@@ -29,6 +30,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.QueryContext;
import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
import org.apache.druid.sql.calcite.planner.JoinAlgorithm;
@@ -39,6 +41,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.math.BigDecimal;
+import java.util.Collections;
import java.util.List;
public class DruidJoinRuleTest
@@ -67,6 +70,7 @@ public class DruidJoinRuleTest
@Before
public void setup()
{
+ NullHandling.initializeForTests();
PlannerContext plannerContext = Mockito.mock(PlannerContext.class);
Mockito.when(plannerContext.queryContext()).thenReturn(QueryContext.empty());
Mockito.when(plannerContext.getJoinAlgorithm()).thenReturn(JoinAlgorithm.BROADCAST);
@@ -85,6 +89,8 @@ public class DruidJoinRuleTest
),
leftType,
null,
+ JoinRelType.INNER,
+ ImmutableList.of(),
rexBuilder
)
);
@@ -106,6 +112,8 @@ public class DruidJoinRuleTest
),
leftType,
null,
+ JoinRelType.INNER,
+ ImmutableList.of(),
rexBuilder
)
);
@@ -114,7 +122,8 @@ public class DruidJoinRuleTest
@Test
public void test_canHandleCondition_leftEqRightFn()
{
- Assert.assertFalse(
+ Assert.assertEquals(
+ NullHandling.sqlCompatible(), // We don't handle non-equi join
conditions for non-sql compatible mode.
druidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
@@ -127,6 +136,8 @@ public class DruidJoinRuleTest
),
leftType,
null,
+ JoinRelType.INNER,
+ ImmutableList.of(),
rexBuilder
)
);
@@ -135,7 +146,9 @@ public class DruidJoinRuleTest
@Test
public void test_canHandleCondition_leftEqLeft()
{
- Assert.assertFalse(
+
+ Assert.assertEquals(
+ NullHandling.sqlCompatible(), // We don't handle non-equi join
conditions for non-sql compatible mode.
druidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
@@ -144,6 +157,8 @@ public class DruidJoinRuleTest
),
leftType,
null,
+ JoinRelType.INNER,
+ ImmutableList.of(),
rexBuilder
)
);
@@ -152,7 +167,8 @@ public class DruidJoinRuleTest
@Test
public void test_canHandleCondition_rightEqRight()
{
- Assert.assertFalse(
+ Assert.assertEquals(
+ NullHandling.sqlCompatible(), // We don't handle non-equi join
conditions for non-sql compatible mode.
druidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
@@ -161,6 +177,54 @@ public class DruidJoinRuleTest
),
leftType,
null,
+ JoinRelType.INNER,
+ ImmutableList.of(),
+ rexBuilder
+ )
+ );
+ }
+
+ @Test
+ public void test_canHandleCondition_leftEqRightFn_leftJoin()
+ {
+ Assert.assertFalse(
+ druidJoinRule.canHandleCondition(
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.CONCAT,
+ rexBuilder.makeLiteral("foo"),
+
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
+ )
+ ),
+ leftType,
+ null,
+ JoinRelType.LEFT,
+ ImmutableList.of(),
+ rexBuilder
+ )
+ );
+ }
+
+ @Test
+ public void test_canHandleCondition_leftEqRightFn_systemFields()
+ {
+ Assert.assertFalse(
+ druidJoinRule.canHandleCondition(
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.CONCAT,
+ rexBuilder.makeLiteral("foo"),
+
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
+ )
+ ),
+ leftType,
+ null,
+ JoinRelType.INNER,
+ Collections.singletonList(null),
rexBuilder
)
);
@@ -174,6 +238,8 @@ public class DruidJoinRuleTest
rexBuilder.makeLiteral(true),
leftType,
null,
+ JoinRelType.INNER,
+ ImmutableList.of(),
rexBuilder
)
);
@@ -187,6 +253,8 @@ public class DruidJoinRuleTest
rexBuilder.makeLiteral(false),
leftType,
null,
+ JoinRelType.INNER,
+ ImmutableList.of(),
rexBuilder
)
);
diff --git a/website/.spelling b/website/.spelling
index 78e25b3285f..002998c442c 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -44,6 +44,7 @@ Base64
Base64-encoded
ByteBuffer
bottlenecked
+cartesian
concat
CIDR
CORS
@@ -504,6 +505,7 @@ stdout
storages
stringDictionaryEncoding
stringified
+sub-conditions
subarray
subnet
subqueries
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]