This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 041865a80f [multistage] support sort push-down (#9832)
041865a80f is described below
commit 041865a80f7a2359270571a2343049bb1f294fe5
Author: Almog Gavra <[email protected]>
AuthorDate: Wed Nov 30 10:25:34 2022 -0800
[multistage] support sort push-down (#9832)
---
.../query/selection/SelectionOperatorService.java | 17 +-
.../query/selection/SelectionOperatorUtils.java | 15 +
.../rel/rules/ImmutableSortExchangeCopyRule.java | 414 +++++++++++++++++++++
.../calcite/rel/rules/PinotQueryRuleSets.java | 9 +-
.../rel/rules/PinotSortExchangeCopyRule.java | 114 ++++++
.../rel/rules/PinotSortExchangeNodeInsertRule.java | 18 +-
.../query/planner/logical/RelToStageConverter.java | 5 +-
.../query/planner/logical/RexExpressionUtils.java | 10 +
.../pinot/query/planner/logical/StagePlanner.java | 6 +-
.../apache/pinot/query/planner/stage/SortNode.java | 4 +-
.../rel/rules/PinotSortExchangeCopyRuleTest.java | 249 +++++++++++++
.../apache/pinot/query/runtime/QueryRunner.java | 46 ++-
.../pinot/query/runtime/operator/SortOperator.java | 2 +-
.../src/test/resources/queries/OrderBy.json | 232 ++++++++++++
14 files changed, 1111 insertions(+), 30 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index f77c936a21..525ab7af9e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -25,7 +25,6 @@ import java.util.PriorityQueue;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.spi.utils.LoopUtils;
import org.roaringbitmap.RoaringBitmap;
@@ -134,18 +133,7 @@ public class SelectionOperatorService {
public ResultTable renderResultTableWithOrdering() {
int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(_selectionColumns, _dataSchema);
int numColumns = columnIndices.length;
-
- // Construct the result data schema
- String[] columnNames = _dataSchema.getColumnNames();
- ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
- String[] resultColumnNames = new String[numColumns];
- ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
- for (int i = 0; i < numColumns; i++) {
- int columnIndex = columnIndices[i];
- resultColumnNames[i] = columnNames[columnIndex];
- resultColumnDataTypes[i] = columnDataTypes[columnIndex];
- }
- DataSchema resultDataSchema = new DataSchema(resultColumnNames,
resultColumnDataTypes);
+ DataSchema resultDataSchema =
SelectionOperatorUtils.getSchemaForProjection(_dataSchema, columnIndices);
// Extract the result rows
LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
@@ -156,9 +144,10 @@ public class SelectionOperatorService {
for (int i = 0; i < numColumns; i++) {
Object value = row[columnIndices[i]];
if (value != null) {
- extractedRow[i] = resultColumnDataTypes[i].convertAndFormat(value);
+ extractedRow[i] =
resultDataSchema.getColumnDataType(i).convertAndFormat(value);
}
}
+
rowsInSelectionResults.addFirst(extractedRow);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index e53b4f2982..bda3efab0b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -656,4 +656,19 @@ public class SelectionOperatorUtils {
queue.offer(value);
}
}
+
+ public static DataSchema getSchemaForProjection(DataSchema dataSchema, int[]
columnIndices) {
+ int numColumns = columnIndices.length;
+
+ String[] columnNames = dataSchema.getColumnNames();
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+ String[] resultColumnNames = new String[numColumns];
+ ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ int columnIndex = columnIndices[i];
+ resultColumnNames[i] = columnNames[columnIndex];
+ resultColumnDataTypes[i] = columnDataTypes[columnIndex];
+ }
+ return new DataSchema(resultColumnNames, resultColumnDataTypes);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/ImmutableSortExchangeCopyRule.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/ImmutableSortExchangeCopyRule.java
new file mode 100644
index 0000000000..eca7fc6110
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/ImmutableSortExchangeCopyRule.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+// NOTE: this file was generated using Calcite's code generator, but instead
of pulling in all
+// the dependencies for codegen we just manually generate it and check it in.
If active development
+// on this needs to happen, re-generate it using Calcite's generator.
+
+// CHECKSTYLE:OFF
+
+import com.google.common.base.MoreObjects;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+/**
+ * {@code ImmutableSortExchangeCopyRule} contains immutable implementation
classes generated from
+ * abstract value types defined as nested inside {@link SortExchangeCopyRule}.
+ * @see ImmutableSortExchangeCopyRule.Config
+ */
+@SuppressWarnings({"all"})
+final class ImmutableSortExchangeCopyRule {
+ private ImmutableSortExchangeCopyRule() {
+ }
+
+ /**
+ * Immutable implementation of {@link SortExchangeCopyRule.Config}.
+ * <p>
+ * Use the builder to create immutable instances:
+ * {@code ImmutableSortExchangeCopyRule.Config.builder()}.
+ * Use the static factory method to get the default singleton instance:
+ * {@code ImmutableSortExchangeCopyRule.Config.of()}.
+ */
+ static final class Config implements PinotSortExchangeCopyRule.Config {
+ private final RelBuilderFactory relBuilderFactory;
+ private final @Nullable String description;
+ private final RelRule.OperandTransform operandSupplier;
+
+ private Config() {
+ this.description = null;
+ this.relBuilderFactory = initShim.relBuilderFactory();
+ this.operandSupplier = initShim.operandSupplier();
+ this.initShim = null;
+ }
+
+ private Config(ImmutableSortExchangeCopyRule.Config.Builder builder) {
+ this.description = builder.description;
+ if (builder.relBuilderFactory != null) {
+ initShim.withRelBuilderFactory(builder.relBuilderFactory);
+ }
+ if (builder.operandSupplier != null) {
+ initShim.withOperandSupplier(builder.operandSupplier);
+ }
+ this.relBuilderFactory = initShim.relBuilderFactory();
+ this.operandSupplier = initShim.operandSupplier();
+ this.initShim = null;
+ }
+
+ private Config(RelBuilderFactory relBuilderFactory,
+ @Nullable String description,
+ RelRule.OperandTransform operandSupplier) {
+ this.relBuilderFactory = relBuilderFactory;
+ this.description = description;
+ this.operandSupplier = operandSupplier;
+ this.initShim = null;
+ }
+
+ private static final byte STAGE_INITIALIZING = -1;
+ private static final byte STAGE_UNINITIALIZED = 0;
+ private static final byte STAGE_INITIALIZED = 1;
+ @SuppressWarnings("Immutable")
+ private transient volatile InitShim initShim = new InitShim();
+
+ private final class InitShim {
+ private byte relBuilderFactoryBuildStage = STAGE_UNINITIALIZED;
+ private RelBuilderFactory relBuilderFactory;
+
+ RelBuilderFactory relBuilderFactory() {
+ if (relBuilderFactoryBuildStage == STAGE_INITIALIZING) {
+ throw new IllegalStateException(formatInitCycleMessage());
+ }
+ if (relBuilderFactoryBuildStage == STAGE_UNINITIALIZED) {
+ relBuilderFactoryBuildStage = STAGE_INITIALIZING;
+ this.relBuilderFactory =
Objects.requireNonNull(relBuilderFactoryInitialize(), "relBuilderFactory");
+ relBuilderFactoryBuildStage = STAGE_INITIALIZED;
+ }
+ return this.relBuilderFactory;
+ }
+
+ void withRelBuilderFactory(RelBuilderFactory relBuilderFactory) {
+ this.relBuilderFactory = relBuilderFactory;
+ relBuilderFactoryBuildStage = STAGE_INITIALIZED;
+ }
+
+ private byte operandSupplierBuildStage = STAGE_UNINITIALIZED;
+ private RelRule.OperandTransform operandSupplier;
+
+ RelRule.OperandTransform operandSupplier() {
+ if (operandSupplierBuildStage == STAGE_INITIALIZING) {
+ throw new IllegalStateException(formatInitCycleMessage());
+ }
+ if (operandSupplierBuildStage == STAGE_UNINITIALIZED) {
+ operandSupplierBuildStage = STAGE_INITIALIZING;
+ this.operandSupplier =
Objects.requireNonNull(operandSupplierInitialize(), "operandSupplier");
+ operandSupplierBuildStage = STAGE_INITIALIZED;
+ }
+ return this.operandSupplier;
+ }
+
+ void withOperandSupplier(RelRule.OperandTransform operandSupplier) {
+ this.operandSupplier = operandSupplier;
+ operandSupplierBuildStage = STAGE_INITIALIZED;
+ }
+
+ private String formatInitCycleMessage() {
+ List<String> attributes = new ArrayList<>();
+ if (relBuilderFactoryBuildStage == STAGE_INITIALIZING) {
+ attributes.add("relBuilderFactory");
+ }
+ if (operandSupplierBuildStage == STAGE_INITIALIZING) {
+ attributes.add("operandSupplier");
+ }
+ return "Cannot build Config, attribute initializers form cycle " +
attributes;
+ }
+ }
+
+ private RelBuilderFactory relBuilderFactoryInitialize() {
+ return PinotSortExchangeCopyRule.Config.super.relBuilderFactory();
+ }
+
+ private RelRule.OperandTransform operandSupplierInitialize() {
+ return PinotSortExchangeCopyRule.Config.super.operandSupplier();
+ }
+
+ /**
+ * @return The value of the {@code relBuilderFactory} attribute
+ */
+ @Override
+ public RelBuilderFactory relBuilderFactory() {
+ InitShim shim = this.initShim;
+ return shim != null ? shim.relBuilderFactory() : this.relBuilderFactory;
+ }
+
+ /**
+ * @return The value of the {@code description} attribute
+ */
+ @Override
+ public @Nullable String description() {
+ return description;
+ }
+
+ /**
+ * @return The value of the {@code operandSupplier} attribute
+ */
+ @Override
+ public RelRule.OperandTransform operandSupplier() {
+ InitShim shim = this.initShim;
+ return shim != null ? shim.operandSupplier() : this.operandSupplier;
+ }
+
+ /**
+ * Copy the current immutable object by setting a value for the
+ * {@link SortExchangeCopyRule.Config#relBuilderFactory()
relBuilderFactory} attribute.
+ * A shallow reference equality check is used to prevent copying of the
same value by returning {@code this}.
+ * @param value A new value for relBuilderFactory
+ * @return A modified copy of the {@code this} object
+ */
+ public final ImmutableSortExchangeCopyRule.Config
withRelBuilderFactory(RelBuilderFactory value) {
+ if (this.relBuilderFactory == value) {
+ return this;
+ }
+ RelBuilderFactory newValue = Objects.requireNonNull(value,
"relBuilderFactory");
+ return validate(new ImmutableSortExchangeCopyRule.Config(newValue,
this.description, this.operandSupplier));
+ }
+
+ /**
+ * Copy the current immutable object by setting a value for the {@link
SortExchangeCopyRule.Config#description()
+ * description} attribute.
+ * An equals check used to prevent copying of the same value by returning
{@code this}.
+ * @param value A new value for description (can be {@code null})
+ * @return A modified copy of the {@code this} object
+ */
+ public final ImmutableSortExchangeCopyRule.Config withDescription(
+ @Nullable String value) {
+ if (Objects.equals(this.description, value)) {
+ return this;
+ }
+ return validate(new
ImmutableSortExchangeCopyRule.Config(this.relBuilderFactory, value,
this.operandSupplier));
+ }
+
+ /**
+ * Copy the current immutable object by setting a value for the
+ * {@link SortExchangeCopyRule.Config#operandSupplier() operandSupplier}
attribute.
+ * A shallow reference equality check is used to prevent copying of the
same value by returning {@code this}.
+ * @param value A new value for operandSupplier
+ * @return A modified copy of the {@code this} object
+ */
+ public final ImmutableSortExchangeCopyRule.Config
withOperandSupplier(RelRule.OperandTransform value) {
+ if (this.operandSupplier == value) {
+ return this;
+ }
+ RelRule.OperandTransform newValue = Objects.requireNonNull(value,
"operandSupplier");
+ return validate(new
ImmutableSortExchangeCopyRule.Config(this.relBuilderFactory, this.description,
newValue));
+ }
+
+ /**
+ * This instance is equal to all instances of {@code Config} that have
equal attribute values.
+ * @return {@code true} if {@code this} is equal to {@code another}
instance
+ */
+ @Override
+ public boolean equals(@Nullable Object another) {
+ if (this == another) {
+ return true;
+ }
+ return another instanceof ImmutableSortExchangeCopyRule.Config &&
equalTo(
+ (ImmutableSortExchangeCopyRule.Config) another);
+ }
+
+ private boolean equalTo(ImmutableSortExchangeCopyRule.Config another) {
+ return relBuilderFactory.equals(another.relBuilderFactory) &&
Objects.equals(description, another.description)
+ && operandSupplier.equals(another.operandSupplier);
+ }
+
+ /**
+ * Computes a hash code from attributes: {@code relBuilderFactory}, {@code
description}, {@code operandSupplier}.
+ * @return hashCode value
+ */
+ @Override
+ public int hashCode() {
+ int h = 5381;
+ h += (h << 5) + relBuilderFactory.hashCode();
+ h += (h << 5) + Objects.hashCode(description);
+ h += (h << 5) + operandSupplier.hashCode();
+ return h;
+ }
+
+ /**
+ * Prints the immutable value {@code Config} with attribute values.
+ * @return A string representation of the value
+ */
+ @Override
+ public String toString() {
+ return
MoreObjects.toStringHelper("Config").omitNullValues().add("relBuilderFactory",
relBuilderFactory)
+ .add("description", description).add("operandSupplier",
operandSupplier).toString();
+ }
+
+ private static final ImmutableSortExchangeCopyRule.Config INSTANCE =
+ validate(new ImmutableSortExchangeCopyRule.Config());
+
+ /**
+ * Returns the default immutable singleton value of {@code Config}
+ * @return An immutable instance of Config
+ */
+ public static ImmutableSortExchangeCopyRule.Config of() {
+ return INSTANCE;
+ }
+
+ private static ImmutableSortExchangeCopyRule.Config
validate(ImmutableSortExchangeCopyRule.Config instance) {
+ return INSTANCE != null && INSTANCE.equalTo(instance) ? INSTANCE :
instance;
+ }
+
+ /**
+ * Creates an immutable copy of a {@link SortExchangeCopyRule.Config}
value.
+ * Uses accessors to get values to initialize the new immutable instance.
+ * If an instance is already immutable, it is returned as is.
+ * @param instance The instance to copy
+ * @return A copied immutable Config instance
+ */
+ public static ImmutableSortExchangeCopyRule.Config
copyOf(PinotSortExchangeCopyRule.Config instance) {
+ if (instance instanceof ImmutableSortExchangeCopyRule.Config) {
+ return (ImmutableSortExchangeCopyRule.Config) instance;
+ }
+ return
ImmutableSortExchangeCopyRule.Config.builder().from(instance).build();
+ }
+
+ /**
+ * Creates a builder for {@link ImmutableSortExchangeCopyRule.Config
Config}.
+ * <pre>
+ * ImmutableSortExchangeCopyRule.Config.builder()
+ * .withRelBuilderFactory(org.apache.calcite.tools.RelBuilderFactory)
+ * // optional {@link SortExchangeCopyRule.Config#relBuilderFactory()
relBuilderFactory}
+ *
.withDescription(@org.checkerframework.checker.nullness.qual.Nullable String |
null)
+ * // nullable {@link SortExchangeCopyRule.Config#description()
description}
+ *
.withOperandSupplier(org.apache.calcite.plan.RelRule.OperandTransform)
+ * // optional {@link SortExchangeCopyRule.Config#operandSupplier()
operandSupplier}
+ * .build();
+ * </pre>
+ * @return A new Config builder
+ */
+ public static ImmutableSortExchangeCopyRule.Config.Builder builder() {
+ return new ImmutableSortExchangeCopyRule.Config.Builder();
+ }
+
+ /**
+ * Builds instances of type {@link ImmutableSortExchangeCopyRule.Config
Config}.
+ * Initialize attributes and then invoke the {@link #build()} method to
create an
+ * immutable instance.
+ * <p><em>{@code Builder} is not thread-safe and generally should not be
stored in a field or collection,
+ * but instead used immediately to create instances.</em>
+ */
+ @NotThreadSafe
+ public static final class Builder {
+ private @Nullable RelBuilderFactory relBuilderFactory;
+ private @Nullable String description;
+ private @Nullable RelRule.OperandTransform operandSupplier;
+
+ private Builder() {
+ }
+
+ /**
+ * Fill a builder with attribute values from the provided {@code
org.apache.calcite.plan.RelRule.Config} instance.
+ * @param instance The instance from which to copy values
+ * @return {@code this} builder for use in a chained invocation
+ */
+ public final Builder from(RelRule.Config instance) {
+ Objects.requireNonNull(instance, "instance");
+ from((Object) instance);
+ return this;
+ }
+
+ /**
+ * Fill a builder with attribute values from the provided {@code
org.apache.calcite.rel.rules
+ * .SortExchangeCopyRule.Config} instance.
+ * @param instance The instance from which to copy values
+ * @return {@code this} builder for use in a chained invocation
+ */
+ public final Builder from(PinotSortExchangeCopyRule.Config instance) {
+ Objects.requireNonNull(instance, "instance");
+ from((Object) instance);
+ return this;
+ }
+
+ private void from(Object object) {
+ if (object instanceof RelRule.Config) {
+ RelRule.Config instance = (RelRule.Config) object;
+ withRelBuilderFactory(instance.relBuilderFactory());
+ withOperandSupplier(instance.operandSupplier());
+ @Nullable
+ String descriptionValue =
+ instance.description();
+ if (descriptionValue != null) {
+ withDescription(descriptionValue);
+ }
+ }
+ }
+
+ /**
+ * Initializes the value for the {@link
SortExchangeCopyRule.Config#relBuilderFactory() relBuilderFactory}
+ * attribute.
+ * <p><em>If not set, this attribute will have a default value as
returned by the initializer of
+ * {@link SortExchangeCopyRule.Config#relBuilderFactory()
relBuilderFactory}.</em>
+ * @param relBuilderFactory The value for relBuilderFactory
+ * @return {@code this} builder for use in a chained invocation
+ */
+ public final Builder withRelBuilderFactory(RelBuilderFactory
relBuilderFactory) {
+ this.relBuilderFactory = Objects.requireNonNull(relBuilderFactory,
"relBuilderFactory");
+ return this;
+ }
+
+ /**
+ * Initializes the value for the {@link
SortExchangeCopyRule.Config#description() description} attribute.
+ * @param description The value for description (can be {@code null})
+ * @return {@code this} builder for use in a chained invocation
+ */
+ public final Builder withDescription(
+ @Nullable String description) {
+ this.description = description;
+ return this;
+ }
+
+ /**
+ * Initializes the value for the {@link
SortExchangeCopyRule.Config#operandSupplier() operandSupplier} attribute.
+ * <p><em>If not set, this attribute will have a default value as
returned by the initializer of
+ * {@link SortExchangeCopyRule.Config#operandSupplier()
operandSupplier}.</em>
+ * @param operandSupplier The value for operandSupplier
+ * @return {@code this} builder for use in a chained invocation
+ */
+ public final Builder withOperandSupplier(RelRule.OperandTransform
operandSupplier) {
+ this.operandSupplier = Objects.requireNonNull(operandSupplier,
"operandSupplier");
+ return this;
+ }
+
+ /**
+ * Builds a new {@link ImmutableSortExchangeCopyRule.Config Config}.
+ * @return An immutable instance of Config
+ * @throws java.lang.IllegalStateException if any required attributes
are missing
+ */
+ public ImmutableSortExchangeCopyRule.Config build() {
+ return ImmutableSortExchangeCopyRule.Config.validate(new
ImmutableSortExchangeCopyRule.Config(this));
+ }
+ }
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 4ed6be0a23..1828abd0e8 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -59,9 +59,15 @@ public class PinotQueryRuleSets {
CoreRules.PROJECT_MERGE,
// remove identity project
CoreRules.PROJECT_REMOVE,
+ // add an extra exchange for sort
+ PinotSortExchangeNodeInsertRule.INSTANCE,
+ // copy exchanges down
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY,
// reorder sort and projection
CoreRules.SORT_PROJECT_TRANSPOSE,
+ // TODO: evaluate the SORT_JOIN_TRANSPOSE and SORT_JOIN_COPY rules
+
// join rules
CoreRules.JOIN_PUSH_EXPRESSIONS,
@@ -89,7 +95,6 @@ public class PinotQueryRuleSets {
// Pinot specific rules
PinotFilterExpandSearchRule.INSTANCE,
PinotJoinExchangeNodeInsertRule.INSTANCE,
- PinotAggregateExchangeNodeInsertRule.INSTANCE,
- PinotSortExchangeNodeInsertRule.INSTANCE
+ PinotAggregateExchangeNodeInsertRule.INSTANCE
);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
new file mode 100644
index 0000000000..7f163ca99b
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.SortExchange;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.metadata.RelMdUtil;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.pinot.query.planner.logical.RexExpressionUtils;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+
+
+public class PinotSortExchangeCopyRule extends RelRule<RelRule.Config> {
+
+ public static final PinotSortExchangeCopyRule SORT_EXCHANGE_COPY =
+ PinotSortExchangeCopyRule.Config.DEFAULT.toRule();
+ private static final TypeFactory TYPE_FACTORY = new TypeFactory(new
TypeSystem());
+
+ /**
+ * Creates a PinotSortExchangeCopyRule.
+ */
+ protected PinotSortExchangeCopyRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Sort sort = call.rel(0);
+ final SortExchange exchange = call.rel(1);
+ final RelMetadataQuery metadataQuery = call.getMetadataQuery();
+
+ if (RelMdUtil.checkInputForCollationAndLimit(
+ metadataQuery,
+ exchange.getInput(),
+ sort.getCollation(),
+ sort.offset,
+ sort.fetch)) {
+ // Don't rewrite anything if the input is already sorted AND the
+ // input node would already return fewer than sort.offset + sort.fetch
+ // rows (e.g. there is already an inner limit applied)
+ return;
+ }
+
+ RelCollation collation = sort.getCollation();
+ Preconditions.checkArgument(
+ collation.equals(exchange.getCollation()),
+ "Expected collation on exchange and sort to be the same"
+ );
+
+ final RexNode fetch;
+ if (sort.fetch == null) {
+ fetch = null;
+ } else if (sort.offset == null) {
+ fetch = sort.fetch;
+ } else {
+ RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY);
+ int total = RexExpressionUtils.getValueAsInt(sort.fetch) +
RexExpressionUtils.getValueAsInt(sort.offset);
+ fetch = rexBuilder.makeLiteral(total,
TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+ }
+
+ final RelNode newExchangeInput = sort.copy(sort.getTraitSet(),
exchange.getInput(), collation, null, fetch);
+ final RelNode exchangeCopy = exchange.copy(exchange.getTraitSet(),
newExchangeInput, exchange.getDistribution());
+ final RelNode sortCopy = sort.copy(sort.getTraitSet(), exchangeCopy,
collation, sort.offset, sort.fetch);
+
+ call.transformTo(sortCopy);
+ }
+
+ public interface Config extends RelRule.Config {
+
+ Config DEFAULT = ImmutableSortExchangeCopyRule.Config.of()
+ .withOperandFor(LogicalSort.class, LogicalSortExchange.class);
+
+ @Override default PinotSortExchangeCopyRule toRule() {
+ return new PinotSortExchangeCopyRule(this);
+ }
+
+ /** Defines an operand tree for the given classes. */
+
+ default Config withOperandFor(Class<? extends Sort> sortClass,
+ Class<? extends SortExchange> exchangeClass) {
+ return withOperandSupplier(b0 ->
+ b0.operand(sortClass).oneInput(b1 ->
+ b1.operand(exchangeClass).anyInputs()))
+ .as(Config.class);
+ }
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
index fe57992074..f03b78dc3d 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
@@ -23,13 +23,21 @@ import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
import org.apache.calcite.tools.RelBuilderFactory;
/**
- * Special rule for Pinot, this rule is fixed to always insert exchange after
SORT node.
+ * Rewrite any sort into a relation that adds an exchange and pushes down the
collation
+ * to the rest of the tree. This happens for two reasons:
+ * <ol>
+ * <li>Sort needs to be a distributed operation, if there are multiple nodes
that are
+ * scanning data the sort ordering must be applied globally.</li>
+ * <li>It is ideal to push down the sort ordering as far as possible. If
upstream nodes
+ * can send data in sorted order, then we can apply N-way merge sort and
early terminate
+ * once all nodes have sent data that is no longer in the top
OFFSET+LIMIT.</li>
+ * </ol>
*/
public class PinotSortExchangeNodeInsertRule extends RelOptRule {
public static final PinotSortExchangeNodeInsertRule INSTANCE =
@@ -54,8 +62,10 @@ public class PinotSortExchangeNodeInsertRule extends
RelOptRule {
@Override
public void onMatch(RelOptRuleCall call) {
Sort sort = call.rel(0);
- // TODO: this is a single value
- LogicalExchange exchange = LogicalExchange.create(sort.getInput(),
RelDistributions.hash(Collections.emptyList()));
+ LogicalSortExchange exchange = LogicalSortExchange.create(
+ sort.getInput(),
+ RelDistributions.hash(Collections.emptyList()),
+ sort.getCollation());
call.transformTo(LogicalSort.create(exchange, sort.getCollation(),
sort.offset, sort.fetch));
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index de6a87ca88..cd29edac30 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -33,7 +33,6 @@ import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
-import org.apache.calcite.rex.RexLiteral;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.stage.AggregateNode;
@@ -88,8 +87,8 @@ public final class RelToStageConverter {
}
private static StageNode convertLogicalSort(LogicalSort node, int
currentStageId) {
- int fetch = node.fetch == null ? 0 : ((RexLiteral)
node.fetch).getValueAs(Integer.class);
- int offset = node.offset == null ? 0 : ((RexLiteral)
node.offset).getValueAs(Integer.class);
+ int fetch = RexExpressionUtils.getValueAsInt(node.fetch);
+ int offset = RexExpressionUtils.getValueAsInt(node.offset);
return new SortNode(currentStageId,
node.getCollation().getFieldCollations(), fetch, offset,
toDataSchema(node.getRowType()));
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
index cc66047651..314b0baa54 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
@@ -86,4 +86,14 @@ public class RexExpressionUtils {
}
return result;
}
+
+ public static Integer getValueAsInt(RexNode in) {
+ if (in == null) {
+ return 0;
+ }
+
+ Preconditions.checkArgument(in instanceof RexLiteral, "expected literal,
got " + in);
+ RexLiteral literal = (RexLiteral) in;
+ return literal.getValueAs(Integer.class);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 5f46b23d26..736794c283 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.core.Exchange;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
@@ -92,7 +92,7 @@ public class StagePlanner {
private StageNode walkRelPlan(RelNode node, int currentStageId) {
if (isExchangeNode(node)) {
StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
- RelDistribution distribution = ((LogicalExchange)
node).getDistribution();
+ RelDistribution distribution = ((Exchange) node).getDistribution();
return createSendReceivePair(nextStageRoot, distribution,
currentStageId);
} else {
StageNode stageNode = RelToStageConverter.toStageNode(node,
currentStageId);
@@ -125,7 +125,7 @@ public class StagePlanner {
}
private boolean isExchangeNode(RelNode node) {
- return (node instanceof LogicalExchange);
+ return (node instanceof Exchange);
}
private int getNewStageId() {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
index 38b2da6c56..4ffe901b20 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
@@ -70,7 +70,9 @@ public class SortNode extends AbstractStageNode {
@Override
public String explain() {
- return "SORT" + (_fetch > 0 ? " (LIMIT " + _fetch + ")" : "");
+ return String.format("SORT%s%s",
+ (_fetch > 0) ? " LIMIT " + _fetch : "",
+ (_offset > 0) ? " OFFSET " + _offset : "");
}
@Override
diff --git
a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
new file mode 100644
index 0000000000..b92fd79352
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.SortExchange;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class PinotSortExchangeCopyRuleTest {
+
+ public static final TypeFactory TYPE_FACTORY = new TypeFactory(new
TypeSystem());
+ private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+ private AutoCloseable _mocks;
+
+ @Mock
+ private RelOptRuleCall _call;
+ @Mock
+ private RelNode _input;
+ @Mock
+ private RelOptCluster _cluster;
+ @Mock
+ private RelMetadataQuery _query;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ RelTraitSet traits = RelTraitSet.createEmpty();
+ Mockito.when(_input.getTraitSet()).thenReturn(traits);
+ Mockito.when(_input.getCluster()).thenReturn(_cluster);
+ Mockito.when(_call.getMetadataQuery()).thenReturn(_query);
+ Mockito.when(_query.getMaxRowCount(Mockito.any())).thenReturn(null);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void shouldMatchLimitNoOffsetNoSort() {
+ // Given:
+ SortExchange exchange = LogicalSortExchange.create(_input,
RelDistributions.SINGLETON, RelCollations.EMPTY);
+ Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, null,
literal(1));
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture =
ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call,
Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof
LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0)
instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort)
sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation().getKeys().size(), 0);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertEquals((innerSort).fetch, literal(1));
+ }
+
+ @Test
+ public void shouldMatchLimitNoOffsetYesSort() {
+ // Given:
+ RelCollation collation = RelCollations.of(1);
+ SortExchange exchange = LogicalSortExchange.create(_input,
RelDistributions.SINGLETON, collation);
+ Sort sort = LogicalSort.create(exchange, collation, null, literal(1));
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture =
ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call,
Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof
LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0)
instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort)
sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation(), collation);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertEquals((innerSort).fetch, literal(1));
+ }
+
+ @Test
+ public void shouldMatchNoSortAndPushDownLimitPlusOffset() {
+ // Given:
+ SortExchange exchange = LogicalSortExchange.create(_input,
RelDistributions.SINGLETON, RelCollations.EMPTY);
+ Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, literal(2),
literal(1));
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture =
ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call,
Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof
LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0)
instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort)
sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation().getKeys().size(), 0);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertEquals((innerSort).fetch, literal(3));
+ }
+
+ @Test
+ public void shouldMatchSortOnly() {
+ // Given:
+ RelCollation collation = RelCollations.of(1);
+ SortExchange exchange = LogicalSortExchange.create(_input,
RelDistributions.SINGLETON, collation);
+ Sort sort = LogicalSort.create(exchange, collation, null, null);
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture =
ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call,
Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof
LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0)
instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort)
sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation(), collation);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertNull((innerSort).fetch);
+ }
+
+ @Test
+ public void shouldMatchLimitOffsetAndSort() {
+ // Given:
+ RelCollation collation = RelCollations.of(1);
+ SortExchange exchange = LogicalSortExchange.create(_input,
RelDistributions.SINGLETON, collation);
+ Sort sort = LogicalSort.create(exchange, collation, literal(1),
literal(2));
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture =
ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call,
Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof
LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0)
instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort)
sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation(), collation);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertEquals((innerSort).fetch, literal(3));
+ }
+
+ @Test
+ public void shouldNotMatchOnlySortAlreadySorted() {
+ // Given:
+ RelCollation collation = RelCollations.of(1);
+ SortExchange exchange = LogicalSortExchange.create(_input,
RelDistributions.SINGLETON, collation);
+ Sort sort = LogicalSort.create(exchange, collation, null, null);
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
Mockito.when(_query.collations(_input)).thenReturn(ImmutableList.of(collation));
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ Mockito.verify(_call, Mockito.never()).transformTo(Mockito.any(),
Mockito.anyMap());
+ }
+
+ @Test
+ public void shouldNotMatchOffsetNoLimitNoSort() {
+ // Given:
+ SortExchange exchange = LogicalSortExchange.create(_input,
RelDistributions.SINGLETON, RelCollations.EMPTY);
+ Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, literal(1),
null);
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ Mockito.verify(_call, Mockito.never()).transformTo(Mockito.any(),
Mockito.anyMap());
+ }
+
+ private static RexNode literal(int i) {
+ return REX_BUILDER.makeLiteral(i,
TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index eaf119880b..32dcc2c0f6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -20,8 +20,10 @@ package org.apache.pinot.query.runtime;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -29,10 +31,13 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.MultiplexingMailboxService;
@@ -183,10 +188,47 @@ public class QueryRunner {
private InstanceResponseBlock processServerQuery(ServerQueryRequest
serverQueryRequest,
ExecutorService executorService) {
try {
- return _serverExecutor.execute(serverQueryRequest, executorService);
+ InstanceResponseBlock result =
_serverExecutor.execute(serverQueryRequest, executorService);
+
+ if (result.getRows() != null &&
serverQueryRequest.getQueryContext().getOrderByExpressions() != null) {
+ // we only re-arrange columns to match the projection in the case of
order by - this is to ensure
+ // that V1 results match what the expected projection schema in the
calcite logical operator; if
+ // we realize that there are other situations where we need to
post-process v1 results to adhere to
+ // the expected results we should factor this out and also apply the
canonicalization of the data
+ // types during this post-process step (also see
LeafStageTransferableBlockOperator#canonicalizeRow)
+ DataSchema dataSchema = result.getDataSchema();
+ List<String> selectionColumns =
+
SelectionOperatorUtils.getSelectionColumns(serverQueryRequest.getQueryContext(),
dataSchema);
+
+ int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(selectionColumns, dataSchema);
+ int numColumns = columnIndices.length;
+
+ DataSchema resultDataSchema =
SelectionOperatorUtils.getSchemaForProjection(dataSchema, columnIndices);
+
+ // Extract the result rows
+ LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
+ for (Object[] row : result.getRows()) {
+ assert row != null;
+ Object[] extractedRow = new Object[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ Object value = row[columnIndices[i]];
+ if (value != null) {
+ extractedRow[i] = value;
+ }
+ }
+
+ rowsInSelectionResults.addFirst(extractedRow);
+ }
+
+ return new InstanceResponseBlock(
+ new SelectionResultsBlock(resultDataSchema,
rowsInSelectionResults),
+ serverQueryRequest.getQueryContext());
+ } else {
+ return result;
+ }
} catch (Exception e) {
InstanceResponseBlock errorResponse = new InstanceResponseBlock();
-
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
e.getMessage());
+
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
Objects.toString(e.getMessage()));
return errorResponse;
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 64a3ed6810..79dcd1f6a6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -106,7 +106,7 @@ public class SortOperator extends
BaseOperator<TransferableBlock> {
LinkedList<Object[]> rows = new LinkedList<>();
while (_rows.size() > _offset) {
Object[] row = _rows.poll();
- rows.addFirst(row);
+ rows.addFirst(row);
}
_isSortedBlockConstructed = true;
if (rows.size() == 0) {
diff --git a/pinot-query-runtime/src/test/resources/queries/OrderBy.json
b/pinot-query-runtime/src/test/resources/queries/OrderBy.json
new file mode 100644
index 0000000000..2c7708743c
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/OrderBy.json
@@ -0,0 +1,232 @@
+{
+ "basic_order_by": {
+ "tables": {
+ "basic": {
+ "schema": [
+ {"name": "col0", "type": "INT"},
+ {"name": "col1", "type": "INT"},
+ {"name": "col2", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, 2, "a"],
+ [2, 3, "b"],
+ [3, 1, "c"],
+ [4, 4, "d"],
+ [5, 5, "e"],
+ [6, 6, "f"]
+ ]
+ }
+ },
+ "queries": [
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col0 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT 2 OFFSET 0"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col2, col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1, col2 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 LIMIT 2"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 LIMIT 100"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 OFFSET 100"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 DESC"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 DESC LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 ASC"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 ASC LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY power(col1, 2)"},
+ {"sql": "SELECT * FROM {basic} WHERE col1 > 3 ORDER BY col1 LIMIT 2
OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} WHERE col0 > 3 ORDER BY col1 LIMIT 2
OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} WHERE col1 > 3 AND col1 < 5 ORDER BY col1
LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT col1 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT col2 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT col1, col2 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT col1, col0, col2 FROM {basic} ORDER BY col1 LIMIT 2
OFFSET 1"},
+ {
+ "ignored": true,
+ "comment": "we don't support ALL statement",
+ "sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT ALL"
+ },
+ {
+ "ignored": true,
+ "comment": "we don't support LIMIT NULL",
+ "sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT NULL"
+ }
+ ]
+ },
+ "order_by_agg": {
+ "tables": {
+ "agg": {
+ "schema": [
+ {"name": "val", "type": "INT"},
+ {"name": "g", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, "a"],
+ [2, "a"],
+ [3, "b"],
+ [4, "b"],
+ [5, "c"]
+ ]
+ }
+ },
+ "queries": [
+ {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY sum"},
+ {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY sum
LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g HAVING SUM(val)
> 3 ORDER BY sum LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY g"},
+ {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY
SUM(val)"},
+ {"sql": "SELECT SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY g"}
+ ]
+ },
+ "order_by_join": {
+ "tables": {
+ "l": {
+ "schema": [
+ {"name": "key", "type": "STRING"},
+ {"name": "lval", "type": "INT"}
+ ],
+ "inputs": [
+ ["foo", 1],
+ ["foo", 3],
+ ["foo", 5],
+ ["bar", 2],
+ ["bar", 4],
+ ["bar", 6]
+ ]
+ },
+ "r": {
+ "schema": [
+ {"name": "key", "type": "STRING"},
+ {"name": "rval", "type": "INT"}
+ ],
+ "inputs": [
+ ["foo", 1],
+ ["foo", 3],
+ ["foo", 7],
+ ["bar", 2],
+ ["bar", 4],
+ ["bar", 8]
+ ]
+ }
+ },
+ "queries": [
+ {"sql": "SELECT * FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY lval,
rval LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT lval FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY
lval, rval LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY
{l}.key"},
+ {"sql": "SELECT * FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY
{r}.key"},
+ {"sql": "SELECT {l}.key, SUM(lval), SUM(rval) FROM {l} JOIN {r} ON
{l}.key = {r}.key GROUP BY {l}.key ORDER BY {l}.key"}
+ ]
+ },
+ "order_by_subqueries": {
+ "tables": {
+ "tbl": {
+ "schema": [
+ {"name": "col0", "type": "INT"},
+ {"name": "col1", "type": "INT"},
+ {"name": "col2", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, 2, "a"],
+ [2, 3, "b"],
+ [3, 1, "c"],
+ [4, 4, "d"],
+ [5, 5, "e"],
+ [6, 6, "f"]
+ ]
+ }
+ },
+ "queries": [
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col0)"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2)"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2 OFFSET
1)"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2 OFFSET
1) ORDER BY col1 LIMIT 3"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2 OFFSET
1) ORDER BY col2 LIMIT 3 OFFSET 1"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 5 OFFSET
1) ORDER BY col1 LIMIT 3 OFFSET 1"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 5 OFFSET
1) ORDER BY col1 OFFSET 1"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2) LIMIT
1"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1) ORDER BY
col0"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1) ORDER BY col0
LIMIT 1"}
+ ]
+ },
+ "order_by_float": {
+ "tables": {
+ "floats": {
+ "schema": [
+ {"name": "col0", "type": "FLOAT"},
+ {"name": "col1", "type": "FLOAT"}
+ ],
+ "inputs": [
+ [0.0, 2.2],
+ [3.3, -3.3],
+ [4.4, 1.1],
+ [-5.5, 4.4],
+ [6.6, 5.0],
+ [7.7, -6.6]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT col1 FROM {floats} ORDER BY col1 LIMIT 2 OFFSET 1",
+ "description": "Basic test for ordering by a float column with a limit
and offset"
+ },
+ {
+ "sql": "SELECT col0 FROM {floats} ORDER BY col0 * 2 LIMIT 2 OFFSET 1",
+ "description": "Testing order by with an expression involving floats"
+ }
+ ]
+ },
+ "order_by_double": {
+ "tables": {
+ "doubles": {
+ "schema": [
+ {"name": "col0", "type": "DOUBLE"},
+ {"name": "col1", "type": "DOUBLE"}
+ ],
+ "inputs": [
+ [0.0, 2.2],
+ [3.3, -3.3],
+ [4.4, 1.1],
+ [-5.5, 4.4],
+ [6.6, -5.0],
+ [7.7, 6.6]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT col1 FROM {doubles} ORDER BY col1 LIMIT 2 OFFSET 1",
+ "description": "Basic test for ordering by a double column with a
limit and offset"
+ },
+ {
+ "sql": "SELECT col0 FROM {doubles} ORDER BY col0 * 2 LIMIT 2 OFFSET 1",
+ "description": "Testing order by with an expression involving doubles"
+ }
+ ]
+ },
+ "order_by_boolean": {
+ "ignored": true,
+ "comment": "fails when we try to canonicalizeRow with ClassCastException -
value is already in memory as boolean but DataSchema expects it to be an int",
+ "tables": {
+ "bools": {
+ "schema": [
+ {"name": "col0", "type": "BOOLEAN"},
+ {"name": "col1", "type": "INT"}
+ ],
+ "inputs": [
+ [true, 2],
+ [false, 3],
+ [false, 1],
+ [true, 4],
+ [true, 5],
+ [false, 6]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT col1 FROM {bools} ORDER BY col0, col1 LIMIT 2 OFFSET 1",
+ "description": "Basic test for ordering by a boolean column with a
limit and offset"
+ }
+ ]
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]