This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch 0.2.0 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit e7c9376b6ca116bd0ed852fe1cea18637b3f14bc Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Fri Oct 11 17:58:52 2019 -0700 Hotfix for Selection Comparator - Enhance the comparator for selection order-by queries to avoid doing row based switch - Enhance the 'SELECT *' to not compile the expression - Do not project order-by columns for 'LIMIT 0' (EmptySelection) case --- .../operator/query/SelectionOrderByOperator.java | 87 +++++++++++----------- .../apache/pinot/core/plan/TransformPlanNode.java | 64 +++++++++------- .../query/selection/SelectionOperatorService.java | 49 ++++++------ .../query/selection/SelectionOperatorUtils.java | 61 +++++++++++++-- 4 files changed, 161 insertions(+), 100 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java index eee2ceb..f6cf5a6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.operator.query; import java.io.Serializable; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; @@ -88,51 +89,49 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl } private Comparator<Serializable[]> getComparator() { - return (o1, o2) -> { - int numOrderByExpressions = _sortSequence.size(); - for (int i = 0; i < numOrderByExpressions; i++) { - // Only compare single-value columns - if (!_expressionMetadata[i].isSingleValue()) { - continue; - } - - Serializable v1 = o1[i]; - Serializable v2 = o2[i]; - - int result; - switch (_expressionMetadata[i].getDataType()) { - case INT: - result = ((Integer) v1).compareTo((Integer) v2); - break; - case LONG: - result = ((Long) v1).compareTo((Long) v2); - break; - case FLOAT: - result = ((Float) v1).compareTo((Float) v2); - break; - case DOUBLE: - result = ((Double) v1).compareTo((Double) v2); - break; - case STRING: - result = ((String) v1).compareTo((String) v2); - break; - case BYTES: - result = ByteArray.compare((byte[]) v1, (byte[]) v2); - break; - default: - throw new IllegalStateException(); - } - - if (result != 0) { - if (_sortSequence.get(i).isIsAsc()) { - return -result; - } else { - return result; - } - } + // Compare all single-value columns + int numOrderByExpressions = _sortSequence.size(); + List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions); + for (int i = 0; i < numOrderByExpressions; i++) { + if (_expressionMetadata[i].isSingleValue()) { + valueIndexList.add(i); } - return 0; - }; + } + + int numValuesToCompare = valueIndexList.size(); + int[] valueIndices = new int[numValuesToCompare]; + Comparator[] valueComparators = new Comparator[numValuesToCompare]; + for (int i = 0; i < numValuesToCompare; i++) { + int valueIndex = valueIndexList.get(i); + valueIndices[i] = valueIndex; + switch (_expressionMetadata[valueIndex].getDataType()) { + case INT: + valueComparators[i] = (Comparator<Integer>) Integer::compare; + break; + case LONG: + valueComparators[i] = (Comparator<Long>) Long::compare; + break; + case FLOAT: + valueComparators[i] = (Comparator<Float>) Float::compare; + break; + case DOUBLE: + valueComparators[i] = (Comparator<Double>) Double::compare; + break; + case STRING: + valueComparators[i] = Comparator.naturalOrder(); + break; + case BYTES: + valueComparators[i] = (Comparator<byte[]>) ByteArray::compare; + break; + default: + throw new IllegalStateException(); + } + if (_sortSequence.get(valueIndex).isIsAsc()) { + valueComparators[i] = valueComparators[i].reversed(); + } + } + + return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java index d8ceebb..84d9287 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.plan; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -31,6 +30,7 @@ import org.apache.pinot.common.request.transform.TransformExpressionTree; import org.apache.pinot.core.indexsegment.IndexSegment; import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; +import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,48 +59,60 @@ public class TransformPlanNode implements PlanNode { */ private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegment indexSegment) { if (brokerRequest.isSetAggregationsInfo()) { + // Extract aggregation expressions for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) { if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) { - String expression = AggregationFunctionUtils.getColumn(aggregationInfo); - TransformExpressionTree transformExpressionTree = TransformExpressionTree.compileToExpressionTree(expression); - transformExpressionTree.getColumns(_projectionColumns); - _expressions.add(transformExpressionTree); + addExpressionColumn(AggregationFunctionUtils.getColumn(aggregationInfo)); } } - // Process all group-by expressions + // Extract group-by expressions if (brokerRequest.isSetGroupBy()) { - for (String expression : brokerRequest.getGroupBy().getExpressions()) { - TransformExpressionTree transformExpressionTree = TransformExpressionTree.compileToExpressionTree(expression); - transformExpressionTree.getColumns(_projectionColumns); - _expressions.add(transformExpressionTree); + for (String column : brokerRequest.getGroupBy().getExpressions()) { + addExpressionColumn(column); } } } else { Selection selection = brokerRequest.getSelections(); - List<String> columns = selection.getSelectionColumns(); - if (columns.size() == 1 && columns.get(0).equals("*")) { - columns = new ArrayList<>(indexSegment.getPhysicalColumnNames()); - } - List<SelectionSort> sortSequence = selection.getSelectionSortSequence(); - if (sortSequence == null) { - // For selection only queries, select minimum number of documents. Fetch at least 1 document per - // DocIdSetPlanNode's requirement. - // TODO: Skip the filtering phase and document fetching for LIMIT 0 case - _maxDocPerNextCall = Math.max(Math.min(selection.getSize(), _maxDocPerNextCall), 1); + + // Extract selection expressions + List<String> selectionColumns = selection.getSelectionColumns(); + if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) { + for (String column : indexSegment.getPhysicalColumnNames()) { + _projectionColumns.add(column); + _expressions.add(new TransformExpressionTree(new IdentifierAstNode(column))); + } } else { - for (SelectionSort selectionSort : sortSequence) { - columns.add(selectionSort.getColumn()); + for (String column : selectionColumns) { + addExpressionColumn(column); } } - for (String column : columns) { - TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column); - expression.getColumns(_projectionColumns); - _expressions.add(expression); + + // Extract order-by expressions and update maxDocPerNextCall + if (selection.getSize() > 0) { + List<SelectionSort> sortSequence = selection.getSelectionSortSequence(); + if (sortSequence == null) { + // For selection only queries, select minimum number of documents + _maxDocPerNextCall = Math.min(selection.getSize(), _maxDocPerNextCall); + } else { + for (SelectionSort selectionSort : sortSequence) { + addExpressionColumn(selectionSort.getColumn()); + } + } + } else { + // For LIMIT 0 queries, fetch at least 1 document per DocIdSetPlanNode's requirement + // TODO: Skip the filtering phase and document fetching for LIMIT 0 case + _maxDocPerNextCall = 1; } } } + private void addExpressionColumn(String column) { + TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column); + expression.getColumns(_projectionColumns); + _expressions.add(expression); + } + @Override public TransformOperator run() { return new TransformOperator(_projectionPlanNode.run(), _expressions); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java index 87bd766..b5ecb86 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.query.selection; import java.io.Serializable; +import java.util.ArrayList; import java.util.Comparator; import java.util.LinkedList; import java.util.List; @@ -89,32 +90,32 @@ public class SelectionOperatorService { * @return flexible {@link Comparator} for selection rows. */ private Comparator<Serializable[]> getTypeCompatibleComparator() { - return (o1, o2) -> { - int numOrderByExpressions = _sortSequence.size(); - for (int i = 0; i < numOrderByExpressions; i++) { - Serializable v1 = o1[i]; - Serializable v2 = o2[i]; - - int result; - // Only compare single-value columns - if (v1 instanceof Number) { - result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue()); - } else if (v1 instanceof String) { - result = ((String) v1).compareTo((String) v2); - } else { - continue; - } + // Compare all single-value columns + int numOrderByExpressions = _sortSequence.size(); + List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions); + for (int i = 0; i < numOrderByExpressions; i++) { + if (!_dataSchema.getColumnDataType(i).isArray()) { + valueIndexList.add(i); + } + } - if (result != 0) { - if (_sortSequence.get(i).isIsAsc()) { - return -result; - } else { - return result; - } - } + int numValuesToCompare = valueIndexList.size(); + int[] valueIndices = new int[numValuesToCompare]; + Comparator[] valueComparators = new Comparator[numValuesToCompare]; + for (int i = 0; i < numValuesToCompare; i++) { + int valueIndex = valueIndexList.get(i); + valueIndices[i] = valueIndex; + if (_dataSchema.getColumnDataType(i).isNumber()) { + valueComparators[i] = Comparator.comparingDouble(Number::doubleValue); + } else { + valueComparators[i] = Comparator.naturalOrder(); } - return 0; - }; + if (_sortSequence.get(valueIndex).isIsAsc()) { + valueComparators[i] = valueComparators[i].reversed(); + } + } + + return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java index fe9b107..1d3eb8f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java @@ -23,6 +23,7 @@ import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -42,6 +43,7 @@ import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.indexsegment.IndexSegment; import org.apache.pinot.core.util.ArrayCopyUtils; +import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode; /** @@ -106,15 +108,23 @@ public class SelectionOperatorUtils { } if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) { + // For 'SELECT *', sort all physical columns so that the order is deterministic selectionColumns = new ArrayList<>(indexSegment.getPhysicalColumnNames()); - // Sort the columns so that the order is deterministic selectionColumns.sort(null); - } - for (String selectionColumn : selectionColumns) { - TransformExpressionTree selectionExpression = TransformExpressionTree.compileToExpressionTree(selectionColumn); - if (expressionSet.add(selectionExpression)) { - expressions.add(selectionExpression); + for (String selectionColumn : selectionColumns) { + TransformExpressionTree selectionExpression = + new TransformExpressionTree(new IdentifierAstNode(selectionColumn)); + if (expressionSet.add(selectionExpression)) { + expressions.add(selectionExpression); + } + } + } else { + for (String selectionColumn : selectionColumns) { + TransformExpressionTree selectionExpression = TransformExpressionTree.compileToExpressionTree(selectionColumn); + if (expressionSet.add(selectionExpression)) { + expressions.add(selectionExpression); + } } } @@ -167,6 +177,7 @@ public class SelectionOperatorUtils { /** * Merge two partial results for selection queries with <code>ORDER BY</code>. (Server side) + * TODO: Should use type compatible comparator to compare the rows * * @param mergedRows partial results 1. * @param rowsToMerge partial results 2. @@ -568,4 +579,42 @@ public class SelectionOperatorUtils { queue.offer(value); } } + + /** + * Helper Comparator class to compare rows. + * <p>Two arguments are expected to construct the comparator: + * <ul> + * <li> + * Value indices: an array of column indices in each row where the values need to be compared (only the + * single-value order-by columns need to be compared) + * </li> + * <li> + * Value comparators: an array of Comparator, where each element is the Comparator for the corresponding column in + * the value indices array + * </li> + * </ul> + */ + public static class RowComparator implements Comparator<Serializable[]> { + private final int[] _valueIndices; + private final Comparator[] _valueComparators; + + public RowComparator(int[] valueIndices, Comparator[] valueComparators) { + _valueIndices = valueIndices; + _valueComparators = valueComparators; + } + + @SuppressWarnings("unchecked") + @Override + public int compare(Serializable[] o1, Serializable[] o2) { + int numValuesToCompare = _valueIndices.length; + for (int i = 0; i < numValuesToCompare; i++) { + int valueIndex = _valueIndices[i]; + int result = _valueComparators[i].compare(o1[valueIndex], o2[valueIndex]); + if (result != 0) { + return result; + } + } + return 0; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
