[ 
https://issues.apache.org/jira/browse/PHOENIX-6207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235191#comment-17235191
 ] 

ASF GitHub Bot commented on PHOENIX-6207:
-----------------------------------------

kadirozde commented on a change in pull request #947:
URL: https://github.com/apache/phoenix/pull/947#discussion_r526613025



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
##########
@@ -373,198 +389,254 @@ GroupByCache newCache(RegionCoprocessorEnvironment env, 
ImmutableBytesPtr tenant
             return new InMemoryGroupByCache(env, tenantId, customAnnotations, 
aggregators, estDistVals);
         }
     }
+
+    @Override
+    protected boolean isRegionObserverFor(Scan scan) {
+        return 
scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != 
null ||
+                
scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) 
!= null;
+    }
+
     /**
      * Used for an aggregate query in which the key order does not necessarily 
match the group by
      * key order. In this case, we must collect all distinct groups within a 
region into a map,
      * aggregating as we go.
      * @param limit TODO
      */
-    private RegionScanner 
scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
-            final RegionScanner scanner, final List<Expression> expressions,
-            final ServerAggregators aggregators, long limit) throws 
IOException {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(LogUtil.addCustomAnnotations(
-                    "Grouped aggregation over unordered rows with scan " + scan
-                    + ", group by " + expressions + ", aggregators " + 
aggregators,
-                    ScanUtil.getCustomAnnotations(scan)));
-        }
-        RegionCoprocessorEnvironment env = c.getEnvironment();
-        Configuration conf = env.getConfiguration();
-        int estDistVals = 
conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, 
DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
-        byte[] estDistValsBytes = 
scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
-        if (estDistValsBytes != null) {
-            // Allocate 1.5x estimation
-            estDistVals = Math.max(MIN_DISTINCT_VALUES,
-                            (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
-        }
-        
-        Pair<Integer, Integer> minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
-        boolean useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
-        final boolean spillableEnabled =
-                conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, 
DEFAULT_GROUPBY_SPILLABLE);
-        final PTable.QualifierEncodingScheme encodingScheme = 
EncodedColumnsUtil.getQualifierEncodingScheme(scan);
-
-        GroupByCache groupByCache =
-                GroupByCacheFactory.INSTANCE.newCache(
-                        env, ScanUtil.getTenantId(scan), 
ScanUtil.getCustomAnnotations(scan),
-                        aggregators, estDistVals);
-        boolean success = false;
-        try {
-            boolean hasMore;
-            Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+
+    private static class UnorderedGroupByRegionScanner extends 
BaseRegionScanner {
+        private final Region region;
+        private final Pair<Integer, Integer> minMaxQualifiers;
+        private final boolean useQualifierAsIndex;
+        private final PTable.QualifierEncodingScheme encodingScheme;
+        private final ServerAggregators aggregators;
+        private final long limit;
+        private final List<Expression> expressions;
+        private final long pageSizeInMs;
+        private RegionScanner regionScanner = null;
+        private final boolean spillableEnabled;
+        private final GroupByCache groupByCache;
+
+        private UnorderedGroupByRegionScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+                                            final Scan scan, final 
RegionScanner scanner, final List<Expression> expressions,
+                                            final ServerAggregators 
aggregators, final long limit, final long pageSizeInMs) {
+            super(scanner);
+            this.region = c.getEnvironment().getRegion();
+            minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+            useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
+            encodingScheme = 
EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+            this.aggregators = aggregators;
+            this.limit = limit;
+            this.pageSizeInMs = pageSizeInMs;
+            this.expressions = expressions;
+            RegionCoprocessorEnvironment env = c.getEnvironment();
+            Configuration conf = env.getConfiguration();
+            int estDistVals = 
conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, 
DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
+            byte[] estDistValsBytes = 
scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
+            if (estDistValsBytes != null) {
+                // Allocate 1.5x estimation
+                estDistVals = Math.max(MIN_DISTINCT_VALUES,
+                        (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
+            }
+
+            spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, 
DEFAULT_GROUPBY_SPILLABLE);
+            groupByCache = GroupByCacheFactory.INSTANCE.newCache(env, 
ScanUtil.getTenantId(scan),
+                    ScanUtil.getCustomAnnotations(scan), aggregators, 
estDistVals);
             if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(LogUtil.addCustomAnnotations(
+                        "Grouped aggregation over unordered rows with scan " + 
scan
+                                + ", group by " + expressions + ", aggregators 
" + aggregators,
+                        ScanUtil.getCustomAnnotations(scan)));
                 LOGGER.debug(LogUtil.addCustomAnnotations(
                         "Spillable groupby enabled: " + spillableEnabled,
                         ScanUtil.getCustomAnnotations(scan)));
             }
-            Region region = c.getEnvironment().getRegion();
+        }
+
+        @Override
+        public boolean next(List<Cell> resultsToReturn) throws IOException {
+            boolean hasMore;
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
+            long now;
+            Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
             boolean acquiredLock = false;
             try {
                 region.startRegionOperation();
                 acquiredLock = true;
-                synchronized (scanner) {
+                synchronized (delegate) {
+                    if (regionScanner != null) {
+                        return regionScanner.next(resultsToReturn);
+                    }
                     do {
-                        List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
+                        List<Cell> results = useQualifierAsIndex ?
+                                new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(),
+                                        minMaxQualifiers.getSecond(), 
encodingScheme) :
+                                new ArrayList<Cell>();
                         // Results are potentially returned even when the 
return
                         // value of s.next is false
                         // since this is an indication of whether or not there 
are
                         // more values after the
                         // ones returned
-                        hasMore = scanner.nextRaw(results);
+                        hasMore = delegate.nextRaw(results);
                         if (!results.isEmpty()) {
                             result.setKeyValues(results);
                             ImmutableBytesPtr key =
-                                TupleUtil.getConcatenatedValue(result, 
expressions);
+                                    TupleUtil.getConcatenatedValue(result, 
expressions);
                             Aggregator[] rowAggregators = 
groupByCache.cache(key);
                             // Aggregate values here
                             aggregators.aggregate(rowAggregators, result);
                         }
-                    } while (hasMore && groupByCache.size() < limit);
+                        now = EnvironmentEdgeManager.currentTimeMillis();
+                    } while (hasMore && groupByCache.size() < limit && (now - 
startTime) < pageSizeInMs);
+                    if (hasMore && groupByCache.size() < limit && (now - 
startTime) >= pageSizeInMs) {
+                        // Return a dummy result as we have processed a page 
worth of rows
+                        // but we are not ready to aggregate
+                        getDummyResult(resultsToReturn);
+                        return true;
+                    }
+                    regionScanner = groupByCache.getScanner(delegate);
+                    // Do not sort here, but sort back on the client instead
+                    // The reason is that if the scan ever extends beyond a 
region
+                    // (which can happen if we're basing our parallelization 
split
+                    // points on old metadata), we'll get incorrect query 
results.
+                    return regionScanner.next(resultsToReturn);
                 }
-            }  finally {
+            } finally {
                 if (acquiredLock) region.closeRegionOperation();
             }
+        }
 
-            RegionScanner regionScanner = groupByCache.getScanner(scanner);
-
-            // Do not sort here, but sort back on the client instead
-            // The reason is that if the scan ever extends beyond a region
-            // (which can happen if we're basing our parallelization split
-            // points on old metadata), we'll get incorrect query results.
-            success = true;
-            return regionScanner;
-        } finally {
-            if (!success) {
+        @Override
+        public void close() throws IOException {
+            if (regionScanner != null) {
+                regionScanner.close();
+            } else {

Review comment:
       Actually, it is not needed any more as I introduced a new variable 
called regionScanner which is the scanner returned by the cache. Previously, 
success is set true only when the cache returned the scanner successfully. In 
this patch, this means regionScanner != null. When regionScanner is not null we 
call close() on  regionScanner which closes the cache. Otherwise, we close the 
cache, i.e., Closeables.closeQuietly(groupByCache).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Paged server side grouped aggregate operations
> ----------------------------------------------
>
>                 Key: PHOENIX-6207
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6207
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 5.0.0, 4.14.3
>            Reporter: Kadir OZDEMIR
>            Assignee: Kadir OZDEMIR
>            Priority: Major
>             Fix For: 4.16.0
>
>         Attachments: PHOENIX-6207.4.x.001.patch, PHOENIX-6207.4.x.002.patch, 
> PHOENIX-6207.4.x.003.patch, PHOENIX-6207.4.x.004.patch, 
> PHOENIX-6207.4.x.005.patch, PHOENIX-6207.master.001.patch
>
>
> Phoenix provides the option of performing query operations on the client or 
> server side. This is decided by the Phoenix optimizer based on configuration 
> parameters. For the server side option, the table operation is parallelized 
> such that multiple table regions are scanned. However, currently there is no 
> paging capability and the server side operation can take long enough lead to 
> HBase client timeouts. Putting a limit on the processing time within a single 
> RPC call (i.e., the next operation time on the aggregate scanner) on the 
> server side using a Phoenix level paging is highly desirable. A similar 
> paging mechanism has been already implemented for index rebuild and 
> verification operations and proven to be effective to prevent timeouts. This 
> Jira is for implementing this paging for the server side grouped aggregate 
> operations. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to