[ 
https://issues.apache.org/jira/browse/PHOENIX-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17229365#comment-17229365
 ] 

ASF GitHub Bot commented on PHOENIX-5998:
-----------------------------------------

kadirozde commented on a change in pull request #936:
URL: https://github.com/apache/phoenix/pull/936#discussion_r520726571



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -516,402 +437,19 @@ public RegionScanner run() throws Exception {
             }
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             theScanner =
-                    getWrappedScanner(c, theScanner, offset, scan, 
dataColumns, tupleProjector, 
-                        region, indexMaintainers == null ? null : 
indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
-        } 
-        
-        if (j != null)  {
-            theScanner = new HashJoinRegionScanner(theScanner, p, j, 
ScanUtil.getTenantId(scan), env, useQualifierAsIndex, 
useNewValueColumnQualifier);
-        }
-        
-        int maxBatchSize = 0;
-        long maxBatchSizeBytes = 0L;
-        MutationList mutations = new MutationList();
-        boolean needToWrite = false;
-        Configuration conf = env.getConfiguration();
-
-        /**
-         * Slow down the writes if the memstore size more than
-         * (hbase.hregion.memstore.block.multiplier - 1) times 
hbase.hregion.memstore.flush.size
-         * bytes. This avoids flush storm to hdfs for cases like index 
building where reads and
-         * write happen to all the table regions in the server.
-         */
-        final long blockingMemStoreSize = getBlockingMemstoreSize(region, 
conf) ;
-
-        boolean buildLocalIndex = indexMaintainers != null && 
dataColumns==null && !localIndexScan;
-        if(buildLocalIndex) {
-            checkForLocalIndexColumnFamilies(region, indexMaintainers);
-        }
-        if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
-                || (deleteCQ != null && deleteCF != null) || emptyCF != null 
|| buildLocalIndex) {
-            needToWrite = true;
-            if((isUpsert && (targetHTable == null ||
-                    
!targetHTable.getName().equals(region.getTableDesc().getTableName())))) {
-                needToWrite = false;
-            }
-            maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            mutations = new MutationList(Ints.saturatedCast(maxBatchSize + 
maxBatchSize / 10));
-            maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
-                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
-        }
-        boolean hasMore;
-        int rowCount = 0;
-        boolean hasAny = false;
-        boolean acquiredLock = false;
-        boolean incrScanRefCount = false;
-        Aggregators aggregators = null;
-        Aggregator[] rowAggregators = null;
-        final RegionScanner innerScanner = theScanner;
-        final TenantCache tenantCache = GlobalCache.getTenantCache(env, 
ScanUtil.getTenantId(scan));
-        try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
-            aggregators = ServerAggregators.deserialize(
-                    scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
conf, em);
-            rowAggregators = aggregators.getAggregators();
-            Pair<Integer, Integer> minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
-            Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " "+region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
-            }
-            boolean useIndexProto = true;
-            byte[] indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
-            // for backward compatiblity fall back to look by the old attribute
-            if (indexMaintainersPtr == null) {
-                indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
-                useIndexProto = false;
-            }
-    
-            if(needToWrite) {
-                synchronized (lock) {
-                    if (isRegionClosingOrSplitting) {
-                        throw new IOException("Temporarily unable to write 
from scan because region is closing or splitting");
-                    }
-                    scansReferenceCount++;
-                    incrScanRefCount = true;
-                    lock.notifyAll();
-                }
-            }
-            region.startRegionOperation();
-            acquiredLock = true;
-            synchronized (innerScanner) {
-                do {
-                    List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
-                    // Results are potentially returned even when the return 
value of s.next is false
-                    // since this is an indication of whether or not there are 
more values after the
-                    // ones returned
-                    hasMore = innerScanner.nextRaw(results);
-                    if (!results.isEmpty()) {
-                        rowCount++;
-                        result.setKeyValues(results);
-                        if (isDescRowKeyOrderUpgrade) {
-                            Arrays.fill(values, null);
-                            Cell firstKV = results.get(0);
-                            RowKeySchema schema = 
projectedTable.getRowKeySchema();
-                            int maxOffset = 
schema.iterator(firstKV.getRowArray(), firstKV.getRowOffset() + offset, 
firstKV.getRowLength(), ptr);
-                            for (int i = 0; i < schema.getFieldCount(); i++) {
-                                Boolean hasValue = schema.next(ptr, i, 
maxOffset);
-                                if (hasValue == null) {
-                                    break;
-                                }
-                                Field field = schema.getField(i);
-                                if (field.getSortOrder() == SortOrder.DESC) {
-                                    // Special case for re-writing DESC ARRAY, 
as the actual byte value needs to change in this case
-                                    if (field.getDataType().isArrayType()) {
-                                        field.getDataType().coerceBytes(ptr, 
null, field.getDataType(),
-                                            field.getMaxLength(), 
field.getScale(), field.getSortOrder(), 
-                                            field.getMaxLength(), 
field.getScale(), field.getSortOrder(), true); // force to use correct 
separator byte
-                                    }
-                                    // Special case for re-writing DESC CHAR 
or DESC BINARY, to force the re-writing of trailing space characters
-                                    else if (field.getDataType() == 
PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) {
-                                        int len = ptr.getLength();
-                                        while (len > 0 && 
ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
-                                            len--;
-                                        }
-                                        ptr.set(ptr.get(), ptr.getOffset(), 
len);
-                                        // Special case for re-writing DESC 
FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171)
-                                    } else if (field.getDataType() == 
PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) {
-                                        byte[] invertedBytes = 
SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength());
-                                        ptr.set(invertedBytes);
-                                    }
-                                } else if (field.getDataType() == 
PBinary.INSTANCE) {
-                                    // Remove trailing space characters so 
that the setValues call below will replace them
-                                    // with the correct zero byte character. 
Note this is somewhat dangerous as these
-                                    // could be legit, but I don't know what 
the alternative is.
-                                    int len = ptr.getLength();
-                                    while (len > 0 && 
ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
-                                        len--;
-                                    }
-                                    ptr.set(ptr.get(), ptr.getOffset(), len);  
                                      
-                                }
-                                values[i] = ptr.copyBytes();
-                            }
-                            writeToTable.newKey(ptr, values);
-                            if (Bytes.compareTo(
-                                firstKV.getRowArray(), firstKV.getRowOffset() 
+ offset, firstKV.getRowLength(), 
-                                ptr.get(),ptr.getOffset() + 
offset,ptr.getLength()) == 0) {
-                                continue;
-                            }
-                            byte[] newRow = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
-                            if (offset > 0) { // for local indexes (prepend 
region start key)
-                                byte[] newRowWithOffset = new byte[offset + 
newRow.length];
-                                System.arraycopy(firstKV.getRowArray(), 
firstKV.getRowOffset(), newRowWithOffset, 0, offset);;
-                                System.arraycopy(newRow, 0, newRowWithOffset, 
offset, newRow.length);
-                                newRow = newRowWithOffset;
-                            }
-                            byte[] oldRow = Bytes.copy(firstKV.getRowArray(), 
firstKV.getRowOffset(), firstKV.getRowLength());
-                            for (Cell cell : results) {
-                                // Copy existing cell but with new row key
-                                Cell newCell = new KeyValue(newRow, 0, 
newRow.length,
-                                    cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength(),
-                                    cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
-                                    cell.getTimestamp(), 
KeyValue.Type.codeToType(cell.getTypeByte()),
-                                    cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
-                                switch 
(KeyValue.Type.codeToType(cell.getTypeByte())) {
-                                case Put:
-                                    // If Put, point delete old Put
-                                    Delete del = new Delete(oldRow);
-                                    del.addDeleteMarker(new 
KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
-                                        cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength(),
-                                        cell.getQualifierArray(), 
cell.getQualifierOffset(),
-                                        cell.getQualifierLength(), 
cell.getTimestamp(), KeyValue.Type.Delete,
-                                        ByteUtil.EMPTY_BYTE_ARRAY, 0, 0));
-                                    mutations.add(del);
-
-                                    Put put = new Put(newRow);
-                                    put.add(newCell);
-                                    mutations.add(put);
-                                    break;
-                                case Delete:
-                                case DeleteColumn:
-                                case DeleteFamily:
-                                case DeleteFamilyVersion:
-                                    Delete delete = new Delete(newRow);
-                                    delete.addDeleteMarker(newCell);
-                                    mutations.add(delete);
-                                    break;
-                                }
-                            }
-                        } else if (buildLocalIndex) {
-                            for (IndexMaintainer maintainer : 
indexMaintainers) {
-                                if (!results.isEmpty()) {
-                                    result.getKey(ptr);
-                                    ValueGetter valueGetter =
-                                            
maintainer.createGetterFromKeyValues(
-                                                
ImmutableBytesPtr.copyBytesIfNecessary(ptr),
-                                                results);
-                                    Put put = 
maintainer.buildUpdateMutation(kvBuilder,
-                                        valueGetter, ptr, 
results.get(0).getTimestamp(),
-                                        
env.getRegion().getRegionInfo().getStartKey(),
-                                        
env.getRegion().getRegionInfo().getEndKey());
-
-                                    if (txnProvider != null) {
-                                        put = 
txnProvider.markPutAsCommitted(put, ts, ts);
-                                    }
-                                    indexMutations.add(put);
-                                }
-                            }
-                            result.setKeyValues(results);
-                        } else if (isDelete) {
-                            // FIXME: the version of the Delete constructor 
without the lock
-                            // args was introduced in 0.94.4, thus if we try 
to use it here
-                            // we can no longer use the 0.94.2 version of the 
client.
-                            Cell firstKV = results.get(0);
-                            Delete delete = new Delete(firstKV.getRowArray(),
-                                firstKV.getRowOffset(), 
firstKV.getRowLength(),ts);
-                            if (replayMutations != null) {
-                                delete.setAttribute(REPLAY_WRITES, 
replayMutations);
-                            }
-                            mutations.add(delete);
-                            // force tephra to ignore this deletes
-                            
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
-                        } else if (isUpsert) {
-                            Arrays.fill(values, null);
-                            int bucketNumOffset = 0;
-                            if (projectedTable.getBucketNum() != null) {
-                                values[0] = new byte[] { 0 };
-                                bucketNumOffset = 1;
-                            }
-                            int i = bucketNumOffset;
-                            List<PColumn> projectedColumns = 
projectedTable.getColumns();
-                            for (; i < projectedTable.getPKColumns().size(); 
i++) {
-                                Expression expression = 
selectExpressions.get(i - bucketNumOffset);
-                                if (expression.evaluate(result, ptr)) {
-                                    values[i] = ptr.copyBytes();
-                                    // If SortOrder from expression in SELECT 
doesn't match the
-                                    // column being projected into then invert 
the bits.
-                                    if (expression.getSortOrder() !=
-                                            
projectedColumns.get(i).getSortOrder()) {
-                                        SortOrder.invert(values[i], 0, 
values[i], 0,
-                                            values[i].length);
-                                    }
-                                }else{
-                                    values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
-                                }
-                            }
-                            projectedTable.newKey(ptr, values);
-                            PRow row = projectedTable.newRow(kvBuilder, ts, 
ptr, false);
-                            for (; i < projectedColumns.size(); i++) {
-                                Expression expression = 
selectExpressions.get(i - bucketNumOffset);
-                                if (expression.evaluate(result, ptr)) {
-                                    PColumn column = projectedColumns.get(i);
-                                    if 
(!column.getDataType().isSizeCompatible(ptr, null,
-                                        expression.getDataType(), 
expression.getSortOrder(),
-                                        expression.getMaxLength(), 
expression.getScale(),
-                                        column.getMaxLength(), 
column.getScale())) {
-                                        throw new DataExceedsCapacityException(
-                                                column.getDataType(),
-                                                column.getMaxLength(),
-                                                column.getScale(),
-                                                column.getName().getString());
-                                    }
-                                    column.getDataType().coerceBytes(ptr, null,
-                                        expression.getDataType(), 
expression.getMaxLength(),
-                                        expression.getScale(), 
expression.getSortOrder(), 
-                                        column.getMaxLength(), 
column.getScale(),
-                                        column.getSortOrder(), 
projectedTable.rowKeyOrderOptimizable());
-                                    byte[] bytes = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
-                                    row.setValue(column, bytes);
-                                }
-                            }
-                            for (Mutation mutation : row.toRowMutations()) {
-                                if (replayMutations != null) {
-                                    mutation.setAttribute(REPLAY_WRITES, 
replayMutations);
-                                } else if (txnProvider != null && 
projectedTable.getType() == PTableType.INDEX) {
-                                    mutation = 
txnProvider.markPutAsCommitted((Put)mutation, ts, ts);
-                                }
-                                mutations.add(mutation);
-                            }
-                            for (i = 0; i < selectExpressions.size(); i++) {
-                                selectExpressions.get(i).reset();
-                            }
-                        } else if (deleteCF != null && deleteCQ != null) {
-                            // No need to search for delete column, since we 
project only it
-                            // if no empty key value is being set
-                            if (emptyCF == null ||
-                                    result.getValue(deleteCF, deleteCQ) != 
null) {
-                                Delete delete = new 
Delete(results.get(0).getRowArray(),
-                                    results.get(0).getRowOffset(),
-                                    results.get(0).getRowLength());
-                                delete.deleteColumns(deleteCF,  deleteCQ, ts);
-                                // force tephra to ignore this deletes
-                                
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
-                                mutations.add(delete);
-                            }
-                        }
-                        if (emptyCF != null) {
-                            /*
-                             * If we've specified an emptyCF, then we need to 
insert an empty
-                             * key value "retroactively" for any key value 
that is visible at
-                             * the timestamp that the DDL was issued. Key 
values that are not
-                             * visible at this timestamp will not ever be 
projected up to
-                             * scans past this timestamp, so don't need to be 
considered.
-                             * We insert one empty key value per row per 
timestamp.
-                             */
-                            Set<Long> timeStamps =
-                                    
Sets.newHashSetWithExpectedSize(results.size());
-                            for (Cell kv : results) {
-                                long kvts = kv.getTimestamp();
-                                if (!timeStamps.contains(kvts)) {
-                                    Put put = new Put(kv.getRowArray(), 
kv.getRowOffset(),
-                                        kv.getRowLength());
-                                    put.addColumn(emptyCF, 
QueryConstants.EMPTY_COLUMN_BYTES, kvts,
-                                        ByteUtil.EMPTY_BYTE_ARRAY);
-                                    mutations.add(put);
-                                }
-                            }
-                        }
-                        if (ServerUtil.readyToCommit(mutations.size(), 
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            commit(region, mutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr,
-                                txState, targetHTable, useIndexProto, 
isPKChanging, clientVersionBytes);
-                            mutations.clear();
-                        }
-                        // Commit in batches based on 
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
-
-                        if (ServerUtil.readyToCommit(indexMutations.size(), 
indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            setIndexAndTransactionProperties(indexMutations, 
indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
-                            commitBatch(region, indexMutations, 
blockingMemStoreSize);
-                            indexMutations.clear();
-                        }
-                        aggregators.aggregate(rowAggregators, result);
-                        hasAny = true;
-                    }
-                } while (hasMore);
-                if (!mutations.isEmpty()) {
-                    commit(region, mutations, indexUUID, blockingMemStoreSize, 
indexMaintainersPtr, txState,
-                        targetHTable, useIndexProto, isPKChanging, 
clientVersionBytes);
-                    mutations.clear();
-                }
-
-                if (!indexMutations.isEmpty()) {
-                    commitBatch(region, indexMutations, blockingMemStoreSize);
-                    indexMutations.clear();
-                }
-            }
-        } finally {
-            if (needToWrite && incrScanRefCount) {
-                synchronized (lock) {
-                    scansReferenceCount--;
-                    if (scansReferenceCount < 0) {
-                        LOGGER.warn(
-                            "Scan reference count went below zero. Something 
isn't correct. Resetting it back to zero");
-                        scansReferenceCount = 0;
-                    }
-                    lock.notifyAll();
-                }
-            }
-            try {
-                if (targetHTable != null) {
-                    try {
-                        targetHTable.close();
-                    } catch (IOException e) {
-                        LOGGER.error("Closing table: " + targetHTable + " 
failed: ", e);
-                    }
-                }
-            } finally {
-                try {
-                    innerScanner.close();
-                } finally {
-                    if (acquiredLock) region.closeRegionOperation();
-                }
-            }
-        }
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(LogUtil.addCustomAnnotations("Finished scanning " + 
rowCount + " rows for ungrouped coprocessor scan " + scan, 
ScanUtil.getCustomAnnotations(scan)));
+                    getWrappedScanner(c, theScanner, offset, scan, 
dataColumns, tupleProjector,
+                            region, indexMaintainers == null ? null : 
indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
         }
 
-        final boolean hadAny = hasAny;
-        KeyValue keyValue = null;
-        if (hadAny) {
-            byte[] value = aggregators.toBytes(rowAggregators);
-            keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+        if (j != null)  {
+            theScanner = new HashJoinRegionScanner(theScanner, p, j, 
ScanUtil.getTenantId(scan), env, useQualifierAsIndex, 
useNewValueColumnQualifier);
         }
-        final KeyValue aggKeyValue = keyValue;
-
-        RegionScanner scanner = new BaseRegionScanner(innerScanner) {
-            private boolean done = !hadAny;
-
-            @Override
-            public boolean isFilterDone() {
-                return done;
-            }
-
-            @Override
-            public boolean next(List<Cell> results) throws IOException {
-                if (done) return false;
-                done = true;
-                results.add(aggKeyValue);
-                return false;
-            }
-
-            @Override
-            public long getMaxResultSize() {
-                return scan.getMaxResultSize();
-            }
-        };
+        RegionScanner scanner = new UngroupedAggregateRegionScanner(c, 
theScanner,region, scan, env, this);

Review comment:
       Yes, you are right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> Paged server side ungrouped aggregate operations 
> -------------------------------------------------
>
>                 Key: PHOENIX-5998
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-5998
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: Kadir OZDEMIR
>            Assignee: Kadir OZDEMIR
>            Priority: Major
>             Fix For: 4.16.0
>
>         Attachments: PHOENIX-5998.4.x.001.patch, PHOENIX-5998.4.x.002.patch, 
> PHOENIX-5998.4.x.003.patch
>
>
> Phoenix provides the option of performing upsert select and delete query 
> operations on the client or server side.  This is decided by the Phoenix 
> optimizer based on configuration parameters. For the server side option, the 
> table operation (upsert select/delete query) is parallelized such that 
> multiple table regions are scanned and the mutations derived from these scans 
> can also be executed in parallel on the server side. However, currently there 
> is no paging capability and the server side operation can take long enough 
> lead to HBase client timeouts. When this happens, Phoenix can return failure 
> to its applications and the rest of the parallel scans and mutations on the 
> server side can still continue since  Phoenix has no mechanism in place to 
> stop these operations before returning failure to applications. This can 
> create unexpected race conditions between these left-over operations and the 
> new operations issued by applications. Putting a limit on the number of rows 
> to be processed within a single RPC call (i.e., the next operation on the 
> scanner) on the server side using a Phoenix level paging is highly desirable 
> and a required step to prevent the possible race conditions. This paging 
> mechanism has been already implemented for index rebuild and verification 
> operations and proven to be effective to prevent timeouts. This paging can be 
> implemented for all server side operations including aggregates, upsert 
> selects, delete queries and so on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to