http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java index a15ab35..21cbc2d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java @@ -35,6 +35,10 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor; +import org.apache.phoenix.execute.visitor.ByteCountVisitor; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; +import org.apache.phoenix.execute.visitor.RowCountVisitor; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.expression.aggregator.Aggregators; @@ -90,25 +94,30 @@ public class ClientAggregatePlan extends ClientProcessingPlan { @Override public Cost getCost() { - Long byteCount = null; - try { - byteCount = getEstimatedBytesToScan(); - } catch (SQLException e) { - // ignored. - } - - if (byteCount == null) { + Double outputBytes = this.accept(new ByteCountVisitor()); + Double inputRows = this.getDelegate().accept(new RowCountVisitor()); + Double rowWidth = this.accept(new AvgRowWidthVisitor()); + if (inputRows == null || outputBytes == null || rowWidth == null) { return Cost.UNKNOWN; } + double inputBytes = inputRows * rowWidth; + double rowsBeforeHaving = RowCountVisitor.aggregate( + RowCountVisitor.filter( + inputRows.doubleValue(), + RowCountVisitor.stripSkipScanFilter( + context.getScan().getFilter())), + groupBy); + double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having); + double bytesBeforeHaving = rowWidth * rowsBeforeHaving; + double bytesAfterHaving = rowWidth * rowsAfterHaving; int parallelLevel = CostUtil.estimateParallelLevel( false, context.getConnection().getQueryServices()); - Cost cost = CostUtil.estimateAggregateCost(byteCount, - groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel); + Cost cost = CostUtil.estimateAggregateCost( + inputBytes, bytesBeforeHaving, groupBy, parallelLevel); if (!orderBy.getOrderByExpressions().isEmpty()) { - double outputBytes = CostUtil.estimateAggregateOutputBytes( - byteCount, groupBy, clientAggregators.getEstimatedByteSize()); - Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel); + Cost orderByCost = CostUtil.estimateOrderByCost( + bytesAfterHaving, outputBytes, parallelLevel); cost = cost.plus(orderByCost); } return super.getCost().plus(cost); @@ -210,7 +219,16 @@ public class ClientAggregatePlan extends ClientProcessingPlan { public GroupBy getGroupBy() { return groupBy; } - + + @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + + public Expression getHaving() { + return having; + } + private static class ClientGroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator { private final List<Expression> groupByExpressions;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java index ac43919..75ba8f2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java @@ -85,4 +85,8 @@ public abstract class ClientProcessingPlan extends DelegateQueryPlan { public FilterableStatement getStatement() { return statement; } + + public Expression getWhere() { + return where; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java index 5799990..3427f5f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java @@ -26,6 +26,8 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.visitor.ByteCountVisitor; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.iterate.FilterResultIterator; import org.apache.phoenix.iterate.LimitingResultIterator; @@ -53,28 +55,30 @@ public class ClientScanPlan extends ClientProcessingPlan { @Override public Cost getCost() { - Long byteCount = null; - try { - byteCount = getEstimatedBytesToScan(); - } catch (SQLException e) { - // ignored. - } + Double inputBytes = this.getDelegate().accept(new ByteCountVisitor()); + Double outputBytes = this.accept(new ByteCountVisitor()); - if (byteCount == null) { + if (inputBytes == null || outputBytes == null) { return Cost.UNKNOWN; } - Cost cost = new Cost(0, 0, byteCount); int parallelLevel = CostUtil.estimateParallelLevel( false, context.getConnection().getQueryServices()); + Cost cost = new Cost(0, 0, 0); if (!orderBy.getOrderByExpressions().isEmpty()) { - Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel); + Cost orderByCost = + CostUtil.estimateOrderByCost(inputBytes, outputBytes, parallelLevel); cost = cost.plus(orderByCost); } return super.getCost().plus(cost); } @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { ResultIterator iterator = delegate.iterator(scanGrouper, scan); if (where != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java index 270ad3d..e3e0264 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java @@ -28,6 +28,9 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; +import org.apache.phoenix.execute.visitor.ByteCountVisitor; +import org.apache.phoenix.execute.visitor.RowCountVisitor; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.optimize.Cost; @@ -202,19 +205,18 @@ public class CorrelatePlan extends DelegateQueryPlan { } @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + + public QueryPlan getRhsPlan() { + return rhs; + } + + @Override public Cost getCost() { - Long lhsByteCount = null; - try { - lhsByteCount = delegate.getEstimatedBytesToScan(); - } catch (SQLException e) { - // ignored. - } - Long rhsRowCount = null; - try { - rhsRowCount = rhs.getEstimatedRowsToScan(); - } catch (SQLException e) { - // ignored. - } + Double lhsByteCount = delegate.accept(new ByteCountVisitor()); + Double rhsRowCount = rhs.accept(new RowCountVisitor()); if (lhsByteCount == null || rhsRowCount == null) { return Cost.UNKNOWN; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java index cf0a3cf..0ecf74d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.iterate.CursorResultIterator; import org.apache.phoenix.iterate.LookAheadResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; @@ -51,6 +52,11 @@ public class CursorFetchPlan extends DelegateQueryPlan { return resultIterator; } + @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + @Override public ExplainPlan getExplainPlan() throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 23a0da6..6ade42e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -48,6 +48,9 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; +import org.apache.phoenix.execute.visitor.RowCountVisitor; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; @@ -63,10 +66,7 @@ import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.optimize.Cost; -import org.apache.phoenix.parse.FilterableStatement; -import org.apache.phoenix.parse.ParseNode; -import org.apache.phoenix.parse.SQLParser; -import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.parse.*; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -77,6 +77,7 @@ import org.apache.phoenix.schema.types.PArrayDataType; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.util.CostUtil; import org.apache.phoenix.util.SQLCloseables; import com.google.common.collect.Lists; @@ -92,6 +93,7 @@ public class HashJoinPlan extends DelegateQueryPlan { private final boolean recompileWhereClause; private final Set<TableRef> tableRefs; private final int maxServerCacheTimeToLive; + private final long serverCacheLimit; private final Map<ImmutableBytesPtr,ServerCache> dependencies = Maps.newHashMap(); private HashCacheClient hashClient; private AtomicLong firstJobEndTime; @@ -132,8 +134,11 @@ public class HashJoinPlan extends DelegateQueryPlan { for (SubPlan subPlan : subPlans) { tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs()); } - this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt( + QueryServices services = plan.getContext().getConnection().getQueryServices(); + this.maxServerCacheTimeToLive = services.getProps().getInt( QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); + this.serverCacheLimit = services.getProps().getLong( + QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE); } @Override @@ -270,40 +275,101 @@ public class HashJoinPlan extends DelegateQueryPlan { return statement; } + public HashJoinInfo getJoinInfo() { + return joinInfo; + } + + public SubPlan[] getSubPlans() { + return subPlans; + } + + @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + @Override public Cost getCost() { - Long byteCount = null; try { - byteCount = getEstimatedBytesToScan(); - } catch (SQLException e) { - // ignored. - } + Long r = delegate.getEstimatedRowsToScan(); + Double w = delegate.accept(new AvgRowWidthVisitor()); + if (r == null || w == null) { + return Cost.UNKNOWN; + } - if (byteCount == null) { - return Cost.UNKNOWN; - } + int parallelLevel = CostUtil.estimateParallelLevel( + true, getContext().getConnection().getQueryServices()); + + double rowWidth = w; + double rows = RowCountVisitor.filter( + r.doubleValue(), + RowCountVisitor.stripSkipScanFilter( + delegate.getContext().getScan().getFilter())); + double bytes = rowWidth * rows; + Cost cost = Cost.ZERO; + double rhsByteSum = 0.0; + for (int i = 0; i < subPlans.length; i++) { + double lhsBytes = bytes; + Double rhsRows = subPlans[i].getInnerPlan().accept(new RowCountVisitor()); + Double rhsWidth = subPlans[i].getInnerPlan().accept(new AvgRowWidthVisitor()); + if (rhsRows == null || rhsWidth == null) { + return Cost.UNKNOWN; + } + double rhsBytes = rhsWidth * rhsRows; + rows = RowCountVisitor.join(rows, rhsRows, joinInfo.getJoinTypes()[i]); + rowWidth = AvgRowWidthVisitor.join(rowWidth, rhsWidth, joinInfo.getJoinTypes()[i]); + bytes = rowWidth * rows; + cost = cost.plus(CostUtil.estimateHashJoinCost( + lhsBytes, rhsBytes, bytes, subPlans[i].hasKeyRangeExpression(), parallelLevel)); + rhsByteSum += rhsBytes; + } - Cost cost = new Cost(0, 0, byteCount); - Cost lhsCost = delegate.getCost(); - if (keyRangeExpressions != null) { - // The selectivity of the dynamic rowkey filter. - // TODO replace the constant with an estimate value. - double selectivity = 0.01; - lhsCost = lhsCost.multiplyBy(selectivity); - } - Cost rhsCost = Cost.ZERO; - for (SubPlan subPlan : subPlans) { - rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost()); + if (rhsByteSum > serverCacheLimit) { + return Cost.UNKNOWN; + } + + // Calculate the cost of aggregation and ordering that is performed with the HashJoinPlan + if (delegate instanceof AggregatePlan) { + AggregatePlan aggPlan = (AggregatePlan) delegate; + double rowsBeforeHaving = RowCountVisitor.aggregate(rows, aggPlan.getGroupBy()); + double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, aggPlan.getHaving()); + double bytesBeforeHaving = rowWidth * rowsBeforeHaving; + double bytesAfterHaving = rowWidth * rowsAfterHaving; + Cost aggCost = CostUtil.estimateAggregateCost( + bytes, bytesBeforeHaving, aggPlan.getGroupBy(), parallelLevel); + cost = cost.plus(aggCost); + rows = rowsAfterHaving; + bytes = bytesAfterHaving; + } + double outputRows = RowCountVisitor.limit(rows, delegate.getLimit()); + double outputBytes = rowWidth * outputRows; + if (!delegate.getOrderBy().getOrderByExpressions().isEmpty()) { + int parallelLevel2 = CostUtil.estimateParallelLevel( + delegate instanceof ScanPlan, getContext().getConnection().getQueryServices()); + Cost orderByCost = CostUtil.estimateOrderByCost( + bytes, outputBytes, parallelLevel); + cost = cost.plus(orderByCost); + } + + // Calculate the cost of child nodes + Cost lhsCost = new Cost(0, 0, r.doubleValue() * w); + Cost rhsCost = Cost.ZERO; + for (SubPlan subPlan : subPlans) { + rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost()); + } + return cost.plus(lhsCost).plus(rhsCost); + } catch (SQLException e) { } - return cost.plus(lhsCost).plus(rhsCost); + return Cost.UNKNOWN; } - protected interface SubPlan { + public interface SubPlan { public ServerCache execute(HashJoinPlan parent) throws SQLException; public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException; public List<String> getPreSteps(HashJoinPlan parent) throws SQLException; public List<String> getPostSteps(HashJoinPlan parent) throws SQLException; public QueryPlan getInnerPlan(); + public boolean hasKeyRangeExpression(); } public static class WhereClauseSubPlan implements SubPlan { @@ -383,6 +449,11 @@ public class HashJoinPlan extends DelegateQueryPlan { public QueryPlan getInnerPlan() { return plan; } + + @Override + public boolean hasKeyRangeExpression() { + return false; + } } public static class HashSubPlan implements SubPlan { @@ -495,6 +566,11 @@ public class HashJoinPlan extends DelegateQueryPlan { public QueryPlan getInnerPlan() { return plan; } + + @Override + public boolean hasKeyRangeExpression() { + return keyRangeLhsExpression != null; + } } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index c9abb69..255fca3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@ -30,6 +30,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.ParallelScanGrouper; @@ -81,6 +82,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { } @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, final Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { ResultIterator scanner = new ResultIterator() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index d63950c..ed145a4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -37,6 +37,8 @@ import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.ScanRegionObserver; +import org.apache.phoenix.execute.visitor.ByteCountVisitor; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.BaseResultIterators; @@ -202,16 +204,17 @@ public class ScanPlan extends BaseQueryPlan { } catch (SQLException e) { // ignored. } + Double outputBytes = this.accept(new ByteCountVisitor()); - if (byteCount == null) { + if (byteCount == null || outputBytes == null) { return Cost.UNKNOWN; } - Cost cost = new Cost(0, 0, byteCount); int parallelLevel = CostUtil.estimateParallelLevel( true, context.getConnection().getQueryServices()); + Cost cost = new Cost(0, 0, byteCount); if (!orderBy.getOrderByExpressions().isEmpty()) { - Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel); + Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, outputBytes, parallelLevel); cost = cost.plus(orderByCost); } return cost; @@ -320,6 +323,11 @@ public class ScanPlan extends BaseQueryPlan { } @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override public Long getEstimatedRowsToScan() throws SQLException { if (isSerial) { return serialRowsEstimate; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index 2436d1e..978c7b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -47,6 +47,8 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; +import org.apache.phoenix.execute.visitor.ByteCountVisitor; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.iterate.DefaultParallelScanGrouper; import org.apache.phoenix.iterate.MappedByteBufferQueue; @@ -171,12 +173,7 @@ public class SortMergeJoinPlan implements QueryPlan { @Override public Cost getCost() { - Long byteCount = null; - try { - byteCount = getEstimatedBytesToScan(); - } catch (SQLException e) { - // ignored. - } + Double byteCount = this.accept(new ByteCountVisitor()); if (byteCount == null) { return Cost.UNKNOWN; @@ -255,7 +252,11 @@ public class SortMergeJoinPlan implements QueryPlan { public boolean isRowKeyOrdered() { return false; } - + + public JoinType getJoinType() { + return type; + } + private static SQLException closeIterators(ResultIterator lhsIterator, ResultIterator rhsIterator) { SQLException e = null; try { @@ -717,6 +718,11 @@ public class SortMergeJoinPlan implements QueryPlan { } @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override public Set<TableRef> getSourceRefs() { return tableRefs; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java index f42af56..f869a4c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.iterate.DelegateResultIterator; import org.apache.phoenix.iterate.FilterResultIterator; @@ -78,4 +79,9 @@ public class TupleProjectionPlan extends DelegateQueryPlan { return iterator; } + + @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java index 3b5168c..6114d66 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java @@ -34,6 +34,7 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.iterate.ConcatResultIterator; import org.apache.phoenix.iterate.DefaultParallelScanGrouper; import org.apache.phoenix.iterate.LimitingResultIterator; @@ -112,6 +113,10 @@ public class UnionPlan implements QueryPlan { return iterators.getScans(); } + public List<QueryPlan> getSubPlans() { + return plans; + } + @Override public GroupBy getGroupBy() { return groupBy; @@ -230,7 +235,12 @@ public class UnionPlan implements QueryPlan { return false; } - @Override + @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override public Operation getOperation() { return statement.getOperation(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java index 51cb67e..0bc3df4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.expression.BaseSingleExpression; import org.apache.phoenix.expression.BaseTerminalExpression; import org.apache.phoenix.expression.Expression; @@ -64,6 +65,11 @@ public class UnnestArrayPlan extends DelegateQueryPlan { return null; } + @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + public class UnnestArrayResultIterator extends DelegateResultIterator { private final UnnestArrayElemRefExpression elemRefExpression; private final UnnestArrayElemIndexExpression elemIndexExpression; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java new file mode 100644 index 0000000..9525747 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute.visitor; + +import org.apache.phoenix.compile.ListJarsQueryPlan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.TraceQueryPlan; +import org.apache.phoenix.execute.AggregatePlan; +import org.apache.phoenix.execute.ClientAggregatePlan; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.execute.CorrelatePlan; +import org.apache.phoenix.execute.CursorFetchPlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.execute.LiteralResultIterationPlan; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.SortMergeJoinPlan; +import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.UnionPlan; +import org.apache.phoenix.execute.UnnestArrayPlan; +import org.apache.phoenix.parse.JoinTableNode; + +import java.sql.SQLException; + +/** + * Implementation of QueryPlanVisitor used to get the average number of bytes each + * row for a QueryPlan. + */ +public class AvgRowWidthVisitor implements QueryPlanVisitor<Double> { + + @Override + public Double defaultReturn(QueryPlan plan) { + return null; + } + + @Override + public Double visit(AggregatePlan plan) { + try { + Long byteCount = plan.getEstimatedBytesToScan(); + Long rowCount = plan.getEstimatedRowsToScan(); + if (byteCount != null && rowCount != null) { + if (byteCount == 0) { + return 0.0; + } + if (rowCount != 0) { + return ((double) byteCount) / rowCount; + } + } + } catch (SQLException e) { + } + + return null; + } + + @Override + public Double visit(ScanPlan plan) { + try { + Long byteCount = plan.getEstimatedBytesToScan(); + Long rowCount = plan.getEstimatedRowsToScan(); + if (byteCount != null && rowCount != null) { + if (byteCount == 0) { + return 0.0; + } + if (rowCount != 0) { + return ((double) byteCount) / rowCount; + } + } + } catch (SQLException e) { + } + + return null; + } + + @Override + public Double visit(ClientAggregatePlan plan) { + return plan.getDelegate().accept(this); + } + + @Override + public Double visit(ClientScanPlan plan) { + return plan.getDelegate().accept(this); + } + + @Override + public Double visit(LiteralResultIterationPlan plan) { + return (double) plan.getEstimatedSize(); + } + + @Override + public Double visit(TupleProjectionPlan plan) { + return plan.getDelegate().accept(this); + } + + @Override + public Double visit(HashJoinPlan plan) { + Double lhsWidth = plan.getDelegate().accept(this); + if (lhsWidth == null) { + return null; + } + JoinTableNode.JoinType[] joinTypes = plan.getJoinInfo().getJoinTypes(); + HashJoinPlan.SubPlan[] subPlans = plan.getSubPlans(); + Double width = lhsWidth; + for (int i = 0; i < joinTypes.length; i++) { + Double rhsWidth = subPlans[i].getInnerPlan().accept(this); + if (rhsWidth == null) { + return null; + } + width = join(width, rhsWidth, joinTypes[i]); + } + + return width; + } + + @Override + public Double visit(SortMergeJoinPlan plan) { + Double lhsWidth = plan.getLhsPlan().accept(this); + Double rhsWidth = plan.getRhsPlan().accept(this); + if (lhsWidth == null || rhsWidth == null) { + return null; + } + + return join(lhsWidth, rhsWidth, plan.getJoinType()); + } + + @Override + public Double visit(UnionPlan plan) { + Double sum = 0.0; + for (QueryPlan subPlan : plan.getSubPlans()) { + Double avgWidth = subPlan.accept(this); + if (avgWidth == null) { + return null; + } + sum += avgWidth; + } + + return sum / plan.getSubPlans().size(); + } + + @Override + public Double visit(UnnestArrayPlan plan) { + return plan.getDelegate().accept(this); + } + + @Override + public Double visit(CorrelatePlan plan) { + return plan.getDelegate().accept(this); + } + + @Override + public Double visit(CursorFetchPlan plan) { + return plan.getDelegate().accept(this); + } + + @Override + public Double visit(ListJarsQueryPlan plan) { + return (double) plan.getEstimatedSize(); + } + + @Override + public Double visit(TraceQueryPlan plan) { + return (double) plan.getEstimatedSize(); + } + + + /* + * The below methods provide estimation of row width based on the input row width as well as + * the operator. + */ + + public static double join(double lhsWidth, double rhsWidth, JoinTableNode.JoinType type) { + double width; + switch (type) { + case Inner: + case Left: + case Right: + case Full: { + width = lhsWidth + rhsWidth; + break; + } + case Semi: + case Anti: { + width = lhsWidth; + break; + } + default: { + throw new IllegalArgumentException("Invalid join type: " + type); + } + } + return width; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java new file mode 100644 index 0000000..61a2895 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute.visitor; + +import org.apache.phoenix.compile.ListJarsQueryPlan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.TraceQueryPlan; +import org.apache.phoenix.execute.AggregatePlan; +import org.apache.phoenix.execute.ClientAggregatePlan; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.execute.CorrelatePlan; +import org.apache.phoenix.execute.CursorFetchPlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.execute.LiteralResultIterationPlan; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.SortMergeJoinPlan; +import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.UnionPlan; +import org.apache.phoenix.execute.UnnestArrayPlan; + +/** + * Implementation of QueryPlanVisitor used to get the number of output bytes for a QueryPlan. + */ +public class ByteCountVisitor implements QueryPlanVisitor<Double> { + + @Override + public Double defaultReturn(QueryPlan plan) { + return null; + } + + @Override + public Double visit(AggregatePlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(ScanPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(ClientAggregatePlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(ClientScanPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(LiteralResultIterationPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(TupleProjectionPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(HashJoinPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(SortMergeJoinPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(UnionPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(UnnestArrayPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(CorrelatePlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(CursorFetchPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(ListJarsQueryPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + @Override + public Double visit(TraceQueryPlan plan) { + return getByteCountFromRowCountAndRowWidth(plan); + } + + protected Double getByteCountFromRowCountAndRowWidth(QueryPlan plan) { + Double rowCount = plan.accept(new RowCountVisitor()); + Double rowWidth = plan.accept(new AvgRowWidthVisitor()); + if (rowCount == null || rowWidth == null) { + return null; + } + + return rowCount * rowWidth; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java new file mode 100644 index 0000000..a7ae3af --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute.visitor; + +import org.apache.phoenix.compile.ListJarsQueryPlan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.TraceQueryPlan; +import org.apache.phoenix.execute.*; + +/** + * + * Visitor for a QueryPlan (which may contain other nested query-plans) + * + */ +public interface QueryPlanVisitor<E> { + E defaultReturn(QueryPlan plan); + E visit(AggregatePlan plan); + E visit(ScanPlan plan); + E visit(ClientAggregatePlan plan); + E visit(ClientScanPlan plan); + E visit(LiteralResultIterationPlan plan); + E visit(TupleProjectionPlan plan); + E visit(HashJoinPlan plan); + E visit(SortMergeJoinPlan plan); + E visit(UnionPlan plan); + E visit(UnnestArrayPlan plan); + E visit(CorrelatePlan plan); + E visit(CursorFetchPlan plan); + E visit(ListJarsQueryPlan plan); + E visit(TraceQueryPlan plan); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java new file mode 100644 index 0000000..58ceea9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute.visitor; + +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.phoenix.compile.GroupByCompiler; +import org.apache.phoenix.compile.ListJarsQueryPlan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.TraceQueryPlan; +import org.apache.phoenix.execute.AggregatePlan; +import org.apache.phoenix.execute.ClientAggregatePlan; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.execute.CorrelatePlan; +import org.apache.phoenix.execute.CursorFetchPlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.execute.LiteralResultIterationPlan; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.SortMergeJoinPlan; +import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.UnionPlan; +import org.apache.phoenix.execute.UnnestArrayPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.filter.BooleanExpressionFilter; +import org.apache.phoenix.parse.JoinTableNode; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +/** + * Implementation of QueryPlanVisitor used to get the number of output rows for a QueryPlan. + */ +public class RowCountVisitor implements QueryPlanVisitor<Double> { + + // An estimate of the ratio of result data from group-by against the input data. + private final static double GROUPING_FACTOR = 0.1; + + private final static double OUTER_JOIN_FACTOR = 1.15; + private final static double INNER_JOIN_FACTOR = 0.85; + private final static double SEMI_OR_ANTI_JOIN_FACTOR = 0.5; + + private final static double UNION_DISTINCT_FACTOR = 0.8; + + @Override + public Double defaultReturn(QueryPlan plan) { + return null; + } + + @Override + public Double visit(AggregatePlan plan) { + try { + Long b = plan.getEstimatedRowsToScan(); + if (b != null) { + return limit( + filter( + aggregate( + filter( + b.doubleValue(), + stripSkipScanFilter( + plan.getContext().getScan().getFilter())), + plan.getGroupBy()), + plan.getHaving()), + plan.getLimit()); + } + } catch (SQLException e) { + } + + return null; + } + + @Override + public Double visit(ScanPlan plan) { + try { + Long b = plan.getEstimatedRowsToScan(); + if (b != null) { + return limit( + filter( + b.doubleValue(), + stripSkipScanFilter(plan.getContext().getScan().getFilter())), + plan.getLimit()); + } + } catch (SQLException e) { + } + + return null; + } + + @Override + public Double visit(ClientAggregatePlan plan) { + Double b = plan.getDelegate().accept(this); + if (b != null) { + return limit( + filter( + aggregate( + filter(b.doubleValue(), plan.getWhere()), + plan.getGroupBy()), + plan.getHaving()), + plan.getLimit()); + } + + return null; + } + + @Override + public Double visit(ClientScanPlan plan) { + if (plan.getLimit() != null) { + return (double) plan.getLimit(); + } + Double b = plan.getDelegate().accept(this); + if (b != null) { + return limit( + filter(b.doubleValue(), plan.getWhere()), + plan.getLimit()); + } + + return null; + } + + @Override + public Double visit(LiteralResultIterationPlan plan) { + return 1.0; + } + + @Override + public Double visit(TupleProjectionPlan plan) { + return plan.getDelegate().accept(this); + } + + @Override + public Double visit(HashJoinPlan plan) { + try { + QueryPlan lhsPlan = plan.getDelegate(); + Long b = lhsPlan.getEstimatedRowsToScan(); + if (b == null) { + return null; + } + + Double rows = filter(b.doubleValue(), + stripSkipScanFilter(lhsPlan.getContext().getScan().getFilter())); + JoinTableNode.JoinType[] joinTypes = plan.getJoinInfo().getJoinTypes(); + HashJoinPlan.SubPlan[] subPlans = plan.getSubPlans(); + for (int i = 0; i < joinTypes.length; i++) { + Double rhsRows = subPlans[i].getInnerPlan().accept(this); + if (rhsRows == null) { + return null; + } + rows = join(rows, rhsRows.doubleValue(), joinTypes[i]); + } + if (lhsPlan instanceof AggregatePlan) { + AggregatePlan aggPlan = (AggregatePlan) lhsPlan; + rows = filter(aggregate(rows, aggPlan.getGroupBy()), aggPlan.getHaving()); + } + return limit(rows, lhsPlan.getLimit()); + } catch (SQLException e) { + } + + return null; + } + + @Override + public Double visit(SortMergeJoinPlan plan) { + Double lhsRows = plan.getLhsPlan().accept(this); + Double rhsRows = plan.getRhsPlan().accept(this); + if (lhsRows != null && rhsRows != null) { + return join(lhsRows, rhsRows, plan.getJoinType()); + } + + return null; + } + + @Override + public Double visit(UnionPlan plan) { + int count = plan.getSubPlans().size(); + double[] inputRows = new double[count]; + for (int i = 0; i < count; i++) { + Double b = plan.getSubPlans().get(i).accept(this); + if (b != null) { + inputRows[i] = b.doubleValue(); + } else { + return null; + } + } + + return limit(union(true, inputRows),plan.getLimit()); + } + + @Override + public Double visit(UnnestArrayPlan plan) { + return plan.getDelegate().accept(this); + } + + @Override + public Double visit(CorrelatePlan plan) { + Double lhsRows = plan.getDelegate().accept(this); + if (lhsRows != null) { + return lhsRows * SEMI_OR_ANTI_JOIN_FACTOR; + } + + return null; + } + + @Override + public Double visit(CursorFetchPlan plan) { + return plan.getDelegate().accept(this); + } + + @Override + public Double visit(ListJarsQueryPlan plan) { + return 0.0; + } + + @Override + public Double visit(TraceQueryPlan plan) { + return 0.0; + } + + public static Filter stripSkipScanFilter(Filter filter) { + if (filter == null) { + return null; + } + if (!(filter instanceof FilterList)) { + return filter instanceof BooleanExpressionFilter ? filter : null; + } + FilterList filterList = (FilterList) filter; + if (filterList.getOperator() != FilterList.Operator.MUST_PASS_ALL) { + return filter; + } + List<Filter> list = new ArrayList<>(); + for (Filter f : filterList.getFilters()) { + Filter stripped = stripSkipScanFilter(f); + if (stripped != null) { + list.add(stripped); + } + } + return list.isEmpty() ? null : (list.size() == 1 ? list.get(0) : new FilterList(FilterList.Operator.MUST_PASS_ALL, list)); + } + + + /* + * The below methods provide estimation of row count based on the input row count as well as + * the operator. They should be replaced by more accurate calculation based on histogram and + * a logical operator layer is expect to facilitate this. + */ + + public static double filter(double inputRows, Filter filter) { + if (filter == null) { + return inputRows; + } + return 0.5 * inputRows; + } + + public static double filter(double inputRows, Expression filter) { + if (filter == null) { + return inputRows; + } + return 0.5 * inputRows; + } + + public static double aggregate(double inputRows, GroupByCompiler.GroupBy groupBy) { + if (groupBy.isUngroupedAggregate()) { + return 1.0; + } + return GROUPING_FACTOR * inputRows; + } + + public static double limit(double inputRows, Integer limit) { + if (limit == null) { + return inputRows; + } + return limit; + } + + public static double join(double lhsRows, double[] rhsRows, JoinTableNode.JoinType[] types) { + assert rhsRows.length == types.length; + double rows = lhsRows; + for (int i = 0; i < rhsRows.length; i++) { + rows = join(rows, rhsRows[i], types[i]); + } + return rows; + } + + public static double join(double lhsRows, double rhsRows, JoinTableNode.JoinType type) { + double rows; + switch (type) { + case Inner: { + rows = Math.min(lhsRows, rhsRows); + rows = rows * INNER_JOIN_FACTOR; + break; + } + case Left: + case Right: + case Full: { + rows = Math.max(lhsRows, rhsRows); + rows = rows * OUTER_JOIN_FACTOR; + break; + } + case Semi: + case Anti: { + rows = lhsRows * SEMI_OR_ANTI_JOIN_FACTOR; + break; + } + default: { + throw new IllegalArgumentException("Invalid join type: " + type); + } + } + return rows; + } + + public static double union(boolean all, double... inputRows) { + double rows = 0.0; + for (double d : inputRows) { + rows += d; + } + if (!all) { + rows *= UNION_DISTINCT_FACTOR; + } + return rows; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 0c9e383..f664a9c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -86,6 +86,7 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.exception.UpgradeRequiredException; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.iterate.MaterializedResultIterator; @@ -732,6 +733,11 @@ public class PhoenixStatement implements Statement, SQLCloseable { } @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.defaultReturn(this); + } + + @Override public Long getEstimatedRowsToScan() { return estimatedRows; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java index 1d4b8e0..db2b5ff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java @@ -30,51 +30,52 @@ import org.apache.phoenix.query.QueryServices; */ public class CostUtil { - // An estimate of the ratio of result data from group-by against the input data. - private final static double GROUPING_FACTOR = 0.1; - - // Io operations conducted in intermediate evaluations like sorting or aggregation - // should be counted twice since they usually involve both read and write. - private final static double IO_COST_MULTIPLIER = 2.0; - /** - * Estimate the number of output bytes of an aggregate. - * @param byteCount the number of input bytes + * Estimate the cost of an aggregate. + * @param inputBytes the number of input bytes + * @param outputBytes the number of output bytes * @param groupBy the compiled GroupBy object - * @param aggregatorsSize the byte size of aggregators - * @return the output byte count + * @param parallelLevel number of parallel workers or threads + * @return the cost */ - public static double estimateAggregateOutputBytes( - double byteCount, GroupBy groupBy, int aggregatorsSize) { - if (groupBy.isUngroupedAggregate()) { - return aggregatorsSize; - } - return byteCount * GROUPING_FACTOR; + public static Cost estimateAggregateCost( + double inputBytes, double outputBytes, GroupBy groupBy, int parallelLevel) { + double hashMapOverhead = groupBy.isOrderPreserving() || groupBy.isUngroupedAggregate() ? 1 : (outputBytes < 1 ? 1 : outputBytes); + return new Cost(0, 0, (outputBytes + hashMapOverhead * Math.log(inputBytes)) / parallelLevel); } /** - * Estimate the cost of an aggregate. - * @param byteCount the number of input bytes - * @param groupBy the compiled GroupBy object - * @param aggregatorsSize the byte size of aggregators + * Estimate the cost of an order-by + * @param inputBytes the number of input bytes + * @param outputBytes the number of output bytes, which may be different from inputBytes + * depending on whether there is a LIMIT * @param parallelLevel number of parallel workers or threads * @return the cost */ - public static Cost estimateAggregateCost( - double byteCount, GroupBy groupBy, int aggregatorsSize, int parallelLevel) { - double outputBytes = estimateAggregateOutputBytes(byteCount, groupBy, aggregatorsSize); - double orderedFactor = groupBy.isOrderPreserving() ? 0.2 : 1.0; - return new Cost(0, 0, outputBytes * orderedFactor * IO_COST_MULTIPLIER / parallelLevel); + public static Cost estimateOrderByCost(double inputBytes, double outputBytes, int parallelLevel) { + if (inputBytes < 1) { + inputBytes = 1; + } + return new Cost(0, 0, + (outputBytes + outputBytes * Math.log(inputBytes)) / parallelLevel); } /** - * Estimate the cost of an order-by - * @param byteCount the number of input bytes + * Estimate the cost of a hash-join + * @param lhsBytes the number of left input bytes + * @param rhsBytes the number of right input bytes + * @param outputBytes the number of output bytes * @param parallelLevel number of parallel workers or threads * @return the cost */ - public static Cost estimateOrderByCost(double byteCount, int parallelLevel) { - return new Cost(0, 0, byteCount * IO_COST_MULTIPLIER / parallelLevel); + public static Cost estimateHashJoinCost( + double lhsBytes, double rhsBytes, double outputBytes, + boolean hasKeyRangeExpression, int parallelLevel) { + if (rhsBytes < 1) { + rhsBytes = 1; + } + return new Cost(0, 0, + (rhsBytes * Math.log(rhsBytes) + (hasKeyRangeExpression ? 0 : lhsBytes)) / parallelLevel + outputBytes); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index 1903dda..69aeaad 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -43,6 +43,7 @@ import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.iterate.ParallelIterators; import org.apache.phoenix.iterate.ParallelScanGrouper; @@ -474,6 +475,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { } @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.defaultReturn(this); + } + + @Override public Long getEstimatedRowsToScan() { return null; }