[ 
https://issues.apache.org/jira/browse/PHOENIX-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814555#comment-17814555
 ] 

ASF GitHub Bot commented on PHOENIX-7106:
-----------------------------------------

tkhurana commented on code in PR #1736:
URL: https://github.com/apache/phoenix/pull/1736#discussion_r1479092574


##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java:
##########
@@ -436,36 +622,79 @@ public boolean next(List<Cell> resultsToReturn) throws 
IOException {
                         hasMore = delegate.nextRaw(results);
                         if (!results.isEmpty()) {
                             if (isDummy(results)) {
-                                getDummyResult(resultsToReturn);
-                                return true;
+                                return getDummyResult(resultsToReturn);
                             }
                             result.setKeyValues(results);
                             ImmutableBytesPtr key =
                                     TupleUtil.getConcatenatedValue(result, 
expressions);
+                            ImmutableBytesPtr originalRowKey = new 
ImmutableBytesPtr();
+                            result.getKey(originalRowKey);
+                            currentRowKey = originalRowKey;
                             Aggregator[] rowAggregators = 
groupByCache.cache(key);
+                            groupByCache.cacheAggregateRowKey(key, 
originalRowKey);
                             // Aggregate values here
                             aggregators.aggregate(rowAggregators, result);
                         }
                         now = EnvironmentEdgeManager.currentTimeMillis();
-                    } while (hasMore && groupByCache.size() < limit && (now - 
startTime) < pageSizeMs);
-                    if (hasMore && groupByCache.size() < limit && (now - 
startTime) >= pageSizeMs) {
-                        // Return a dummy result as we have processed a page 
worth of rows
-                        // but we are not ready to aggregate
-                        getDummyResult(resultsToReturn);
-                        return true;
-                    }
+                        if (hasMore && groupByCache.size() < limit &&
+                                (now - startTime) >= pageSizeMs) {
+                            return getDummyResult(resultsToReturn);
+                        }
+                    } while (hasMore && groupByCache.size() < limit);
                     regionScanner = groupByCache.getScanner(delegate);
                     // Do not sort here, but sort back on the client instead
                     // The reason is that if the scan ever extends beyond a 
region
                     // (which can happen if we're basing our parallelization 
split
                     // points on old metadata), we'll get incorrect query 
results.
                     return regionScanner.next(resultsToReturn);
                 }
+            } catch (Exception e) {
+                LOGGER.error("Unordered group-by scanner next encountered 
error.", e);
+                if (e instanceof IOException) {
+                    throw e;
+                } else {
+                    throw new IOException(e);
+                }
             } finally {
                 if (acquiredLock) region.closeRegionOperation();
             }
         }
 
+        /**
+         * Retrieve dummy rowkey and return to the client.
+         *
+         * @param resultsToReturn dummy cell.
+         * @return true if more rows exist after this one, false if scanner is 
done.
+         */
+        private boolean getDummyResult(List<Cell> resultsToReturn) {
+            if (lastReturnedRowKey != null) {
+                ScanUtil.getDummyResult(lastReturnedRowKey, resultsToReturn);
+                return true;
+            }
+            if (scanStartRowKey.length > 0 && !ScanUtil.isLocalIndex(scan)) {
+                if (Bytes.compareTo(prevScanStartRowKey, scanStartRowKey) !=
+                        0 || prevScanIncludeStartRowKey != includeStartRowKey) 
{
+                    byte[] lastByte =
+                            new byte[]{scanStartRowKey[scanStartRowKey.length 
- 1]};
+                    if (scanStartRowKey.length > 1 && Bytes.compareTo(lastByte,
+                            Bytes.toBytesBinary("\\x00")) == 0) {
+                        byte[] prevKey = new byte[scanStartRowKey.length - 1];

Review Comment:
   This can be made a byte util function to calculate the previous key given a 
key.





> Data Integrity issues due to invalid rowkeys returned by various coprocessors
> -----------------------------------------------------------------------------
>
>                 Key: PHOENIX-7106
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7106
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 5.2.0, 5.1.4
>            Reporter: Viraj Jasani
>            Assignee: Viraj Jasani
>            Priority: Blocker
>             Fix For: 5.2.0, 5.1.4
>
>
> HBase scanner interface expects server to perform scan of the cells from 
> HFile or Block cache and return consistent data i.e. rowkey of the cells 
> returned should stay in the range of the scan boundaries. When a region moves 
> and scanner needs reset, or if the current row is too large and the server 
> returns partial row, the subsequent scanner#next is supposed to return 
> remaining cells. When this happens, cell rowkeys returned by servers i.e. any 
> coprocessors is expected to be in the scan boundary range so that server can 
> reliably perform its validation and return remaining cells as expected.
> Phoenix client initiates serial or parallel scans from the aggregators based 
> on the region boundaries and the scan boundaries are sometimes adjusted based 
> on where optimizer provided key ranges, to include tenant boundaries, salt 
> boundaries etc. After the client opens the scanner and performs scan 
> operation, some of the coprocs return invalid rowkey for the following cases:
>  # Grouped aggregate queries
>  # Some Ungrouped aggregate queries
>  # Offset queries
>  # Dummy cells returned with empty rowkey
>  # Update statistics queries
>  # Uncovered Index queries
>  # Ordered results at server side
>  # ORDER BY DESC on rowkey
>  # Global Index read-repair
>  # Paging region scanner with HBase scanner reopen
>  # ORDER BY on non-pk column(s) with/without paging
>  # GROUP BY on non-pk column(s) with/without paging
> Since many of these cases return reserved rowkeys, they are likely not going 
> to match scan or region boundaries. It has potential to cause data integrity 
> issues in certain scenarios as explained above. Empty rowkey returned by 
> server can be treated as end of the region scan by HBase client.
> With the paging feature enabled, if the page size is kept low, we have higher 
> chances of scanners returning dummy cell, resulting in increased num of RPC 
> calls for better latency and timeouts. We should return only valid rowkey in 
> the scan range for all the cases where we perform above mentioned operations 
> like complex aggregate or offset queries etc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to