kadirozde commented on a change in pull request #947: URL: https://github.com/apache/phoenix/pull/947#discussion_r526551255
########## File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ########## @@ -373,198 +389,254 @@ GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesPtr tenant return new InMemoryGroupByCache(env, tenantId, customAnnotations, aggregators, estDistVals); } } + + @Override + protected boolean isRegionObserverFor(Scan scan) { + return scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != null || + scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) != null; + } + /** * Used for an aggregate query in which the key order does not necessarily match the group by * key order. In this case, we must collect all distinct groups within a region into a map, * aggregating as we go. * @param limit TODO */ - private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, - final RegionScanner scanner, final List<Expression> expressions, - final ServerAggregators aggregators, long limit) throws IOException { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(LogUtil.addCustomAnnotations( - "Grouped aggregation over unordered rows with scan " + scan - + ", group by " + expressions + ", aggregators " + aggregators, - ScanUtil.getCustomAnnotations(scan))); - } - RegionCoprocessorEnvironment env = c.getEnvironment(); - Configuration conf = env.getConfiguration(); - int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES); - byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES); - if (estDistValsBytes != null) { - // Allocate 1.5x estimation - estDistVals = Math.max(MIN_DISTINCT_VALUES, - (int) (Bytes.toInt(estDistValsBytes) * 1.5f)); - } - - Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); - boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); - final boolean spillableEnabled = - conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); - final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); - - GroupByCache groupByCache = - GroupByCacheFactory.INSTANCE.newCache( - env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan), - aggregators, estDistVals); - boolean success = false; - try { - boolean hasMore; - Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); + + private static class UnorderedGroupByRegionScanner extends BaseRegionScanner { + private final Region region; + private final Pair<Integer, Integer> minMaxQualifiers; + private final boolean useQualifierAsIndex; + private final PTable.QualifierEncodingScheme encodingScheme; + private final ServerAggregators aggregators; + private final long limit; + private final List<Expression> expressions; + private final long pageSizeInMs; + private RegionScanner regionScanner = null; + private final boolean spillableEnabled; + private final GroupByCache groupByCache; + + private UnorderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + final Scan scan, final RegionScanner scanner, final List<Expression> expressions, + final ServerAggregators aggregators, final long limit, final long pageSizeInMs) { + super(scanner); + this.region = c.getEnvironment().getRegion(); + minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); + useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers); + encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); + this.aggregators = aggregators; + this.limit = limit; + this.pageSizeInMs = pageSizeInMs; + this.expressions = expressions; + RegionCoprocessorEnvironment env = c.getEnvironment(); + Configuration conf = env.getConfiguration(); + int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES); + byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES); + if (estDistValsBytes != null) { + // Allocate 1.5x estimation + estDistVals = Math.max(MIN_DISTINCT_VALUES, + (int) (Bytes.toInt(estDistValsBytes) * 1.5f)); + } + + spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); + groupByCache = GroupByCacheFactory.INSTANCE.newCache(env, ScanUtil.getTenantId(scan), + ScanUtil.getCustomAnnotations(scan), aggregators, estDistVals); if (LOGGER.isDebugEnabled()) { + LOGGER.debug(LogUtil.addCustomAnnotations( + "Grouped aggregation over unordered rows with scan " + scan + + ", group by " + expressions + ", aggregators " + aggregators, + ScanUtil.getCustomAnnotations(scan))); LOGGER.debug(LogUtil.addCustomAnnotations( "Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan))); } - Region region = c.getEnvironment().getRegion(); + } + + @Override + public boolean next(List<Cell> resultsToReturn) throws IOException { + boolean hasMore; + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + long now; + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); boolean acquiredLock = false; try { region.startRegionOperation(); acquiredLock = true; - synchronized (scanner) { + synchronized (delegate) { + if (regionScanner != null) { + return regionScanner.next(resultsToReturn); + } do { - List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? + new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), + minMaxQualifiers.getSecond(), encodingScheme) : + new ArrayList<Cell>(); // Results are potentially returned even when the return // value of s.next is false // since this is an indication of whether or not there are // more values after the // ones returned - hasMore = scanner.nextRaw(results); + hasMore = delegate.nextRaw(results); if (!results.isEmpty()) { result.setKeyValues(results); ImmutableBytesPtr key = - TupleUtil.getConcatenatedValue(result, expressions); + TupleUtil.getConcatenatedValue(result, expressions); Aggregator[] rowAggregators = groupByCache.cache(key); // Aggregate values here aggregators.aggregate(rowAggregators, result); } - } while (hasMore && groupByCache.size() < limit); + now = EnvironmentEdgeManager.currentTimeMillis(); + } while (hasMore && groupByCache.size() < limit && (now - startTime) < pageSizeInMs); + if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeInMs) { + // Return a dummy result as we have processed a page worth of rows + // but we are not ready to aggregate + getDummyResult(resultsToReturn); + return true; + } + regionScanner = groupByCache.getScanner(delegate); + // Do not sort here, but sort back on the client instead + // The reason is that if the scan ever extends beyond a region + // (which can happen if we're basing our parallelization split + // points on old metadata), we'll get incorrect query results. + return regionScanner.next(resultsToReturn); } - } finally { + } finally { if (acquiredLock) region.closeRegionOperation(); } + } - RegionScanner regionScanner = groupByCache.getScanner(scanner); - - // Do not sort here, but sort back on the client instead - // The reason is that if the scan ever extends beyond a region - // (which can happen if we're basing our parallelization split - // points on old metadata), we'll get incorrect query results. - success = true; - return regionScanner; - } finally { - if (!success) { + @Override + public void close() throws IOException { + if (regionScanner != null) { + regionScanner.close(); + } else { Closeables.closeQuietly(groupByCache); } } } - + /** * Used for an aggregate query in which the key order match the group by key order. In this * case, we can do the aggregation as we scan, by detecting when the group by key changes. * @param limit TODO * @throws IOException */ - private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c, - final Scan scan, final RegionScanner scanner, final List<Expression> expressions, - final ServerAggregators aggregators, final long limit) throws IOException { - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(LogUtil.addCustomAnnotations( - "Grouped aggregation over ordered rows with scan " + scan + ", group by " - + expressions + ", aggregators " + aggregators, - ScanUtil.getCustomAnnotations(scan))); + + private static class OrderedGroupByRegionScanner extends BaseRegionScanner { + private final Scan scan; + private final Region region; + private final Pair<Integer, Integer> minMaxQualifiers; + private final boolean useQualifierAsIndex; + private final PTable.QualifierEncodingScheme encodingScheme; + private final ServerAggregators aggregators; + private final long limit; + private Aggregator[] rowAggregators; + private final List<Expression> expressions; + private final long pageSizeInMs; + private long rowCount = 0; + private ImmutableBytesPtr currentKey = null; + + private OrderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + final Scan scan, final RegionScanner scanner, final List<Expression> expressions, + final ServerAggregators aggregators, final long limit, final long pageSizeInMs) { + super(scanner); + this.scan = scan; + this.region = c.getEnvironment().getRegion(); + minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); + useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers); + encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); + this.aggregators = aggregators; + rowAggregators = aggregators.getAggregators(); + this.limit = limit; + this.pageSizeInMs = pageSizeInMs; + this.expressions = expressions; } - final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); - final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers); - final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); - return new BaseRegionScanner(scanner) { - private long rowCount = 0; - private ImmutableBytesPtr currentKey = null; - - @Override - public boolean next(List<Cell> results) throws IOException { - boolean hasMore; - boolean atLimit; - boolean aggBoundary = false; - Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); - ImmutableBytesPtr key = null; - Aggregator[] rowAggregators = aggregators.getAggregators(); - // If we're calculating no aggregate functions, we can exit at the - // start of a new row. Otherwise, we have to wait until an agg - int countOffset = rowAggregators.length == 0 ? 1 : 0; - Region region = c.getEnvironment().getRegion(); - boolean acquiredLock = false; - try { - region.startRegionOperation(); - acquiredLock = true; - synchronized (scanner) { - do { - List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>(); - // Results are potentially returned even when the return - // value of s.next is false - // since this is an indication of whether or not there - // are more values after the - // ones returned - hasMore = scanner.nextRaw(kvs); - if (!kvs.isEmpty()) { - result.setKeyValues(kvs); - key = TupleUtil.getConcatenatedValue(result, expressions); - aggBoundary = currentKey != null && currentKey.compareTo(key) != 0; - if (!aggBoundary) { - aggregators.aggregate(rowAggregators, result); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(LogUtil.addCustomAnnotations( + + @Override + public boolean next(List<Cell> results) throws IOException { + boolean hasMore; + boolean atLimit; + boolean aggBoundary = false; + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + long now; + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); + ImmutableBytesPtr key = null; + + // If we're calculating no aggregate functions, we can exit at the + // start of a new row. Otherwise, we have to wait until an agg + int countOffset = rowAggregators.length == 0 ? 1 : 0; + boolean acquiredLock = false; + try { + region.startRegionOperation(); + acquiredLock = true; + synchronized (delegate) { + do { + List<Cell> kvs = useQualifierAsIndex ? + new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), + minMaxQualifiers.getSecond(), encodingScheme) : + new ArrayList<Cell>(); + // Results are potentially returned even when the return + // value of s.next is false + // since this is an indication of whether or not there + // are more values after the + // ones returned + hasMore = delegate.nextRaw(kvs); + if (!kvs.isEmpty()) { + result.setKeyValues(kvs); + key = TupleUtil.getConcatenatedValue(result, expressions); + aggBoundary = currentKey != null && currentKey.compareTo(key) != 0; + if (!aggBoundary) { + aggregators.aggregate(rowAggregators, result); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(LogUtil.addCustomAnnotations( "Row passed filters: " + kvs - + ", aggregated values: " - + Arrays.asList(rowAggregators), + + ", aggregated values: " + + Arrays.asList(rowAggregators), ScanUtil.getCustomAnnotations(scan))); - } - currentKey = key; } + currentKey = key; } - atLimit = rowCount + countOffset >= limit; - // Do rowCount + 1 b/c we don't have to wait for a complete - // row in the case of a DISTINCT with a LIMIT - } while (hasMore && !aggBoundary && !atLimit); - } - } finally { - if (acquiredLock) region.closeRegionOperation(); + } + atLimit = rowCount + countOffset >= limit; + // Do rowCount + 1 b/c we don't have to wait for a complete + // row in the case of a DISTINCT with a LIMIT + now = EnvironmentEdgeManager.currentTimeMillis(); + } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeInMs); } - - if (currentKey != null) { - byte[] value = aggregators.toBytes(rowAggregators); - KeyValue keyValue = - KeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(), + } finally { + if (acquiredLock) region.closeRegionOperation(); + } + if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeInMs) { + // Return a dummy result as we have processed a page worth of rows + // but we are not ready to aggregate + //getDummyResult(results); Review comment: I will fix the comment ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org