ChinmaySKulkarni commented on a change in pull request #936:
URL: https://github.com/apache/phoenix/pull/936#discussion_r520195446



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -268,7 +244,7 @@ public void doMutation() throws IOException {
         }
     }
 
-    private void commitBatch(Region region, List<Mutation> mutations, long 
blockingMemstoreSize) throws IOException {
+    public void commitBatch(Region region, List<Mutation> mutations, long 
blockingMemstoreSize) throws IOException {

Review comment:
       Isn't package-private access sufficient for all these APIs since I guess 
they are only being called from classes within the package?

##########
File path: phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
##########
@@ -629,6 +629,9 @@ public static Configuration 
setUpConfigForMiniCluster(Configuration conf, ReadOn
         conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
         conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
         conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0);
+        if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS, 
0) == 0) {
+            conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS, 
2);

Review comment:
       In tests we don't generally read too many rows anyways, so won't setting 
this config just unnecessarily slow them down?

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
##########
@@ -33,21 +34,37 @@
     public UngroupedAggregatingResultIterator( PeekingResultIterator 
resultIterator, Aggregators aggregators) {
         super(resultIterator, aggregators);
     }
-    
     @Override
     public Tuple next() throws SQLException {
-        Tuple result = super.next();
-        // Ensure ungrouped aggregregation always returns a row, even if the 
underlying iterator doesn't.
-        if (result == null && !hasRows) {
-            // We should reset ClientAggregators here in case they are being 
reused in a new ResultIterator.
-            aggregators.reset(aggregators.getAggregators());
-            byte[] value = aggregators.toBytes(aggregators.getAggregators());
-            result = new SingleKeyValueTuple(
-                    KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 
-                            SINGLE_COLUMN_FAMILY, 
-                            SINGLE_COLUMN, 
-                            AGG_TIMESTAMP, 
-                            value));
+        Tuple result = resultIterator.next();
+        if (result == null) {
+            // Ensure ungrouped aggregregation always returns a row, even if 
the underlying iterator doesn't.
+            if (!hasRows) {
+                // We should reset ClientAggregators here in case they are 
being reused in a new ResultIterator.
+                aggregators.reset(aggregators.getAggregators());
+                byte[] value = 
aggregators.toBytes(aggregators.getAggregators());
+                result = new SingleKeyValueTuple(
+                        KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY,
+                                SINGLE_COLUMN_FAMILY,
+                                SINGLE_COLUMN,
+                                AGG_TIMESTAMP,
+                                value));
+            }
+        } else {
+            Aggregator[] rowAggregators = aggregators.getAggregators();
+            aggregators.reset(rowAggregators);
+            while (true) {
+                aggregators.aggregate(rowAggregators, result);
+                Tuple nextResult = resultIterator.peek();
+                if (nextResult == null) {
+                    break;
+                }
+                result = resultIterator.next();

Review comment:
       Essentially, the server-side paging just means that more number of 
iterations of this `while (true)` are needed on the client-side to page over 
all of the data, correct?

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
##########
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.REPLAY_WRITES;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.checkForLocalIndexColumnFamilies;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeExpressions;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeTable;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.getBlockingMemstoreSize;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.setIndexAndTransactionProperties;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.memory.InsufficientMemoryException;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueSchema;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBinary;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PFloat;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.util.ExpressionUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(UngroupedAggregateRegionScanner.class);
+
+    protected long pageSizeInRows = Long.MAX_VALUE;
+    protected int maxBatchSize = 0;
+    protected Scan scan;
+    protected RegionScanner innerScanner;
+    protected Region region;
+    private final UngroupedAggregateRegionObserver 
ungroupedAggregateRegionObserver;
+    final private RegionCoprocessorEnvironment env;
+    private final boolean useQualifierAsIndex;
+    boolean needToWrite = false;
+    final Pair<Integer, Integer> minMaxQualifiers;
+    byte[][] values = null;
+    PTable.QualifierEncodingScheme encodingScheme;
+    PTable writeToTable = null;
+    PTable projectedTable = null;
+    boolean isDescRowKeyOrderUpgrade;
+    final int offset;
+    boolean buildLocalIndex;
+    List<IndexMaintainer> indexMaintainers;
+    boolean isPKChanging = false;
+    long ts;
+    PhoenixTransactionProvider txnProvider = null;
+    UngroupedAggregateRegionObserver.MutationList indexMutations;
+    boolean isDelete = false;
+    byte[] replayMutations;
+    boolean isUpsert = false;
+    List<Expression> selectExpressions = null;
+    byte[] deleteCQ = null;
+    byte[] deleteCF = null;
+    byte[] emptyCF = null;
+    byte[] indexUUID;
+    byte[] txState;
+    byte[] clientVersionBytes;
+    long blockingMemStoreSize;
+    long maxBatchSizeBytes = 0L;
+    HTable targetHTable = null;
+    boolean incrScanRefCount = false;
+    byte[] indexMaintainersPtr;
+    boolean useIndexProto;
+
+    public UngroupedAggregateRegionScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+                                           final RegionScanner innerScanner, 
final Region region, final Scan scan,
+                                           final RegionCoprocessorEnvironment 
env,
+                                           final 
UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException, SQLException{
+        super(innerScanner);
+        this.env = env;
+        this.region = region;
+        this.scan = scan;
+        this.ungroupedAggregateRegionObserver = 
ungroupedAggregateRegionObserver;
+        this.innerScanner = innerScanner;
+        Configuration conf = env.getConfiguration();
+        if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != 
null) {
+            byte[] pageSizeFromScan =
+                    
scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_ROWS);
+            if (pageSizeFromScan != null) {
+                pageSizeInRows = Bytes.toLong(pageSizeFromScan);
+            } else {
+                pageSizeInRows =
+                        conf.getLong(UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS,
+                                
QueryServicesOptions.DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS);
+            }
+        }
+        ts = scan.getTimeRange().getMax();
+        boolean localIndexScan = ScanUtil.isLocalIndex(scan);
+
+        encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+        int offsetToBe = 0;
+        if (localIndexScan) {
+            /*
+             * For local indexes, we need to set an offset on row key 
expressions to skip
+             * the region start key.
+             */
+            offsetToBe = region.getRegionInfo().getStartKey().length != 0 ? 
region.getRegionInfo().getStartKey().length :
+                    region.getRegionInfo().getEndKey().length;
+            ScanUtil.setRowKeyOffset(scan, offsetToBe);
+        }
+        offset = offsetToBe;
+
+        byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY);
+        isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null;
+        if (isDescRowKeyOrderUpgrade) {
+            LOGGER.debug("Upgrading row key for " + 
region.getRegionInfo().getTable().getNameAsString());
+            projectedTable = deserializeTable(descRowKeyTableBytes);
+            try {
+                writeToTable = PTableImpl.builderWithColumns(projectedTable,
+                        getColumnsToClone(projectedTable))
+                        .setRowKeyOrderOptimizable(true)
+                        .build();
+            } catch (SQLException e) {
+                ServerUtil.throwIOException("Upgrade failed", e); // Impossible
+            }
+            values = new byte[projectedTable.getPKColumns().size()][];
+        }
+        boolean useProto = false;
+        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+        useProto = localIndexBytes != null;
+        if (localIndexBytes == null) {
+            localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+        }
+        indexMaintainers = localIndexBytes == null ? null : 
IndexMaintainer.deserialize(localIndexBytes, useProto);
+        indexMutations = localIndexBytes == null ? new 
UngroupedAggregateRegionObserver.MutationList() : new 
UngroupedAggregateRegionObserver.MutationList(1024);
+
+        replayMutations = scan.getAttribute(REPLAY_WRITES);
+        indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+        txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+        clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        if (txState != null) {
+            int clientVersion = clientVersionBytes == null ? 
ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+            txnProvider = TransactionFactory.getTransactionProvider(txState, 
clientVersion);
+        }
+        byte[] upsertSelectTable = 
scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
+        if (upsertSelectTable != null) {
+            isUpsert = true;
+            projectedTable = deserializeTable(upsertSelectTable);
+            targetHTable = new 
HTable(ungroupedAggregateRegionObserver.getUpsertSelectConfig(),
+                    projectedTable.getPhysicalName().getBytes());
+            selectExpressions = 
deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
+            values = new byte[projectedTable.getPKColumns().size()][];
+            isPKChanging = ExpressionUtil.isPkPositionChanging(new 
TableRef(projectedTable), selectExpressions);
+        } else {
+            byte[] isDeleteAgg = 
scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
+            isDelete = isDeleteAgg != null && 
Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+            if (!isDelete) {
+                deleteCF = 
scan.getAttribute(BaseScannerRegionObserver.DELETE_CF);
+                deleteCQ = 
scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
+            }
+            emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
+        }
+        ColumnReference[] dataColumns = 
IndexUtil.deserializeDataTableColumnsToJoin(scan);
+        useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
+
+        /**
+         * Slow down the writes if the memstore size more than
+         * (hbase.hregion.memstore.block.multiplier - 1) times 
hbase.hregion.memstore.flush.size
+         * bytes. This avoids flush storm to hdfs for cases like index 
building where reads and
+         * write happen to all the table regions in the server.
+         */
+        blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ;
+
+        buildLocalIndex = indexMaintainers != null && dataColumns==null && 
!localIndexScan;
+        if(buildLocalIndex) {
+            checkForLocalIndexColumnFamilies(region, indexMaintainers);
+        }
+        if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
+                || (deleteCQ != null && deleteCF != null) || emptyCF != null 
|| buildLocalIndex) {
+            needToWrite = true;
+            if((isUpsert && (targetHTable == null ||
+                    
!targetHTable.getName().equals(region.getTableDesc().getTableName())))) {
+                needToWrite = false;
+            }
+            maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+        }
+        minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " " + region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
+        }
+        useIndexProto = true;
+        indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+        // for backward compatiblity fall back to look by the old attribute
+        if (indexMaintainersPtr == null) {
+            indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+            useIndexProto = false;
+        }
+
+        if (needToWrite) {
+            ungroupedAggregateRegionObserver.incrementScansReferenceCount();
+            incrScanRefCount = true;
+        }
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (needToWrite && incrScanRefCount) {
+            ungroupedAggregateRegionObserver.decrementScansReferenceCount();
+        }
+        try {
+            if (targetHTable != null) {
+                try {
+                    targetHTable.close();
+                } catch (IOException e) {
+                    LOGGER.error("Closing table: " + targetHTable + " failed: 
", e);
+                }
+            }
+        } finally {
+            innerScanner.close();
+        }
+    }
+
+    boolean descRowKeyOrderUpgrade(List<Cell> results, ImmutableBytesWritable 
ptr,
+                                UngroupedAggregateRegionObserver.MutationList 
mutations) throws IOException {
+        Arrays.fill(values, null);
+        Cell firstKV = results.get(0);
+        RowKeySchema schema = projectedTable.getRowKeySchema();
+        int maxOffset = schema.iterator(firstKV.getRowArray(), 
firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr);
+        for (int i = 0; i < schema.getFieldCount(); i++) {
+            Boolean hasValue = schema.next(ptr, i, maxOffset);
+            if (hasValue == null) {
+                break;
+            }
+            ValueSchema.Field field = schema.getField(i);
+            if (field.getSortOrder() == SortOrder.DESC) {
+                // Special case for re-writing DESC ARRAY, as the actual byte 
value needs to change in this case
+                if (field.getDataType().isArrayType()) {
+                    field.getDataType().coerceBytes(ptr, null, 
field.getDataType(),
+                            field.getMaxLength(), field.getScale(), 
field.getSortOrder(),
+                            field.getMaxLength(), field.getScale(), 
field.getSortOrder(), true); // force to use correct separator byte
+                }
+                // Special case for re-writing DESC CHAR or DESC BINARY, to 
force the re-writing of trailing space characters
+                else if (field.getDataType() == PChar.INSTANCE || 
field.getDataType() == PBinary.INSTANCE) {
+                    int len = ptr.getLength();
+                    while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == 
StringUtil.SPACE_UTF8) {
+                        len--;
+                    }
+                    ptr.set(ptr.get(), ptr.getOffset(), len);
+                    // Special case for re-writing DESC FLOAT and DOUBLE, as 
they're not inverted like they should be (PHOENIX-2171)
+                } else if (field.getDataType() == PFloat.INSTANCE || 
field.getDataType() == PDouble.INSTANCE) {
+                    byte[] invertedBytes = SortOrder.invert(ptr.get(), 
ptr.getOffset(), ptr.getLength());
+                    ptr.set(invertedBytes);
+                }
+            } else if (field.getDataType() == PBinary.INSTANCE) {
+                // Remove trailing space characters so that the setValues call 
below will replace them
+                // with the correct zero byte character. Note this is somewhat 
dangerous as these
+                // could be legit, but I don't know what the alternative is.
+                int len = ptr.getLength();
+                while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == 
StringUtil.SPACE_UTF8) {
+                    len--;
+                }
+                ptr.set(ptr.get(), ptr.getOffset(), len);
+            }
+            values[i] = ptr.copyBytes();
+        }
+        writeToTable.newKey(ptr, values);
+        if (Bytes.compareTo(
+                firstKV.getRowArray(), firstKV.getRowOffset() + offset, 
firstKV.getRowLength(),
+                ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) {
+            return false;
+        }
+        byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr);
+        if (offset > 0) { // for local indexes (prepend region start key)
+            byte[] newRowWithOffset = new byte[offset + newRow.length];
+            System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), 
newRowWithOffset, 0, offset);;
+            System.arraycopy(newRow, 0, newRowWithOffset, offset, 
newRow.length);
+            newRow = newRowWithOffset;
+        }
+        byte[] oldRow = Bytes.copy(firstKV.getRowArray(), 
firstKV.getRowOffset(), firstKV.getRowLength());
+        for (Cell cell : results) {
+            // Copy existing cell but with new row key
+            Cell newCell = new KeyValue(newRow, 0, newRow.length,
+                    cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(),
+                    cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength(),
+                    cell.getTimestamp(), 
KeyValue.Type.codeToType(cell.getTypeByte()),
+                    cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+            switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
+                case Put:
+                    // If Put, point delete old Put
+                    Delete del = new Delete(oldRow);
+                    del.addDeleteMarker(new KeyValue(cell.getRowArray(), 
cell.getRowOffset(), cell.getRowLength(),
+                            cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(),
+                            cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                            cell.getQualifierLength(), cell.getTimestamp(), 
KeyValue.Type.Delete,
+                            ByteUtil.EMPTY_BYTE_ARRAY, 0, 0));
+                    mutations.add(del);
+
+                    Put put = new Put(newRow);
+                    put.add(newCell);
+                    mutations.add(put);
+                    break;
+                case Delete:
+                case DeleteColumn:
+                case DeleteFamily:
+                case DeleteFamilyVersion:
+                    Delete delete = new Delete(newRow);
+                    delete.addDeleteMarker(newCell);
+                    mutations.add(delete);
+                    break;
+            }
+        }
+        return true;
+    }
+
+    void buildLocalIndex(Tuple result, List<Cell> results, 
ImmutableBytesWritable ptr,
+                         UngroupedAggregateRegionObserver.MutationList 
mutations) throws IOException {
+        for (IndexMaintainer maintainer : indexMaintainers) {
+            if (!results.isEmpty()) {
+                result.getKey(ptr);
+                ValueGetter valueGetter =
+                        maintainer.createGetterFromKeyValues(
+                                ImmutableBytesPtr.copyBytesIfNecessary(ptr),
+                                results);
+                Put put = 
maintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                        valueGetter, ptr, results.get(0).getTimestamp(),
+                        env.getRegion().getRegionInfo().getStartKey(),
+                        env.getRegion().getRegionInfo().getEndKey());
+
+                if (txnProvider != null) {
+                    put = txnProvider.markPutAsCommitted(put, ts, ts);
+                }
+                indexMutations.add(put);
+            }
+        }
+        result.setKeyValues(results);
+    }
+    void deleteRow(List<Cell> results, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        // FIXME: the version of the Delete constructor without the lock
+        // args was introduced in 0.94.4, thus if we try to use it here
+        // we can no longer use the 0.94.2 version of the client.
+        Cell firstKV = results.get(0);
+        Delete delete = new Delete(firstKV.getRowArray(),
+                firstKV.getRowOffset(), firstKV.getRowLength(),ts);
+        if (replayMutations != null) {
+            delete.setAttribute(REPLAY_WRITES, replayMutations);
+        }
+        mutations.add(delete);
+        // force tephra to ignore this deletes
+        
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+    }
+
+    void deleteCForQ(Tuple result, List<Cell> results, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        // No need to search for delete column, since we project only it
+        // if no empty key value is being set
+        if (emptyCF == null ||
+                result.getValue(deleteCF, deleteCQ) != null) {
+            Delete delete = new Delete(results.get(0).getRowArray(),
+                    results.get(0).getRowOffset(),
+                    results.get(0).getRowLength());
+            delete.deleteColumns(deleteCF,  deleteCQ, ts);
+            // force tephra to ignore this deletes
+            
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+            mutations.add(delete);
+        }
+    }
+    void upsert(Tuple result, ImmutableBytesWritable ptr, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        Arrays.fill(values, null);
+        int bucketNumOffset = 0;
+        if (projectedTable.getBucketNum() != null) {
+            values[0] = new byte[] { 0 };
+            bucketNumOffset = 1;
+        }
+        int i = bucketNumOffset;
+        List<PColumn> projectedColumns = projectedTable.getColumns();
+        for (; i < projectedTable.getPKColumns().size(); i++) {
+            Expression expression = selectExpressions.get(i - bucketNumOffset);
+            if (expression.evaluate(result, ptr)) {
+                values[i] = ptr.copyBytes();
+                // If SortOrder from expression in SELECT doesn't match the
+                // column being projected into then invert the bits.
+                if (expression.getSortOrder() !=
+                        projectedColumns.get(i).getSortOrder()) {
+                    SortOrder.invert(values[i], 0, values[i], 0,
+                            values[i].length);
+                }
+            }else{
+                values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
+            }
+        }
+        projectedTable.newKey(ptr, values);
+        PRow row = projectedTable.newRow(GenericKeyValueBuilder.INSTANCE, ts, 
ptr, false);
+        for (; i < projectedColumns.size(); i++) {
+            Expression expression = selectExpressions.get(i - bucketNumOffset);
+            if (expression.evaluate(result, ptr)) {
+                PColumn column = projectedColumns.get(i);
+                if (!column.getDataType().isSizeCompatible(ptr, null,
+                        expression.getDataType(), expression.getSortOrder(),
+                        expression.getMaxLength(), expression.getScale(),
+                        column.getMaxLength(), column.getScale())) {
+                    throw new DataExceedsCapacityException(
+                            column.getDataType(), column.getMaxLength(),
+                            column.getScale(), column.getName().getString(), 
ptr);
+                }
+                column.getDataType().coerceBytes(ptr, null,
+                        expression.getDataType(), expression.getMaxLength(),
+                        expression.getScale(), expression.getSortOrder(),
+                        column.getMaxLength(), column.getScale(),
+                        column.getSortOrder(), 
projectedTable.rowKeyOrderOptimizable());
+                byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                row.setValue(column, bytes);
+            }
+        }
+        for (Mutation mutation : row.toRowMutations()) {
+            if (replayMutations != null) {
+                mutation.setAttribute(REPLAY_WRITES, replayMutations);
+            } else if (txnProvider != null && projectedTable.getType() == 
PTableType.INDEX) {
+                mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, 
ts);
+            }
+            mutations.add(mutation);
+        }
+        for (i = 0; i < selectExpressions.size(); i++) {
+            selectExpressions.get(i).reset();
+        }
+    }
+
+    void insertEmptyKeyValue(List<Cell> results, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        Set<Long> timeStamps =
+                Sets.newHashSetWithExpectedSize(results.size());
+        for (Cell kv : results) {
+            long kvts = kv.getTimestamp();
+            if (!timeStamps.contains(kvts)) {
+                Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
+                        kv.getRowLength());
+                put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
+                        ByteUtil.EMPTY_BYTE_ARRAY);
+                mutations.add(put);
+            }
+        }
+    }
+    @Override
+    public boolean next(List<Cell> resultsToReturn) throws IOException {

Review comment:
       Just a generic question: In case the RS dies and the server-side scanner 
dies with it, when a new scanner starts its work again it will start from row 
0. However, can it occur that the previous scanner had returned rows to the 
client upon which the client issued DELETE/UPSERT operations corresponding to 
the UPSERT SELECT/DELETE? Does something inside UARO prevent this?

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
##########
@@ -342,6 +342,7 @@
     public static final long 
DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 
days */
     public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
     public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
+    public static final long DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS = 
32*1024;

Review comment:
       How did we arrive at 32768 rows? Since we aren't using a notion of row 
size for paging, but rather absolute "number of rows", can we replace the 1024 
with say 1000 just to avoid confusion (one may think this is 32KB total row 
size)?

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
##########
@@ -134,6 +137,7 @@ public TableResultIterator(MutationState mutationState, 
Scan scan, ScanMetricsHo
         
this.retry=plan.getContext().getConnection().getQueryServices().getProps()
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, 
QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
         IndexUtil.setScanAttributesForIndexReadRepair(scan, table, 
plan.getContext().getConnection());
+        scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGING, TRUE_BYTES);

Review comment:
       Can there be cases where SERVER_PAGING is not desirable? This change 
sort of enforces all queries to use paging. Wondering if it is worth 
introducing a client-side config to decide whether to set this attribute to 
True/False.

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -516,402 +437,19 @@ public RegionScanner run() throws Exception {
             }
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             theScanner =
-                    getWrappedScanner(c, theScanner, offset, scan, 
dataColumns, tupleProjector, 
-                        region, indexMaintainers == null ? null : 
indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
-        } 
-        
-        if (j != null)  {
-            theScanner = new HashJoinRegionScanner(theScanner, p, j, 
ScanUtil.getTenantId(scan), env, useQualifierAsIndex, 
useNewValueColumnQualifier);
-        }
-        
-        int maxBatchSize = 0;
-        long maxBatchSizeBytes = 0L;
-        MutationList mutations = new MutationList();
-        boolean needToWrite = false;
-        Configuration conf = env.getConfiguration();
-
-        /**
-         * Slow down the writes if the memstore size more than
-         * (hbase.hregion.memstore.block.multiplier - 1) times 
hbase.hregion.memstore.flush.size
-         * bytes. This avoids flush storm to hdfs for cases like index 
building where reads and
-         * write happen to all the table regions in the server.
-         */
-        final long blockingMemStoreSize = getBlockingMemstoreSize(region, 
conf) ;
-
-        boolean buildLocalIndex = indexMaintainers != null && 
dataColumns==null && !localIndexScan;
-        if(buildLocalIndex) {
-            checkForLocalIndexColumnFamilies(region, indexMaintainers);
-        }
-        if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
-                || (deleteCQ != null && deleteCF != null) || emptyCF != null 
|| buildLocalIndex) {
-            needToWrite = true;
-            if((isUpsert && (targetHTable == null ||
-                    
!targetHTable.getName().equals(region.getTableDesc().getTableName())))) {
-                needToWrite = false;
-            }
-            maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            mutations = new MutationList(Ints.saturatedCast(maxBatchSize + 
maxBatchSize / 10));
-            maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
-                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
-        }
-        boolean hasMore;
-        int rowCount = 0;
-        boolean hasAny = false;
-        boolean acquiredLock = false;
-        boolean incrScanRefCount = false;
-        Aggregators aggregators = null;
-        Aggregator[] rowAggregators = null;
-        final RegionScanner innerScanner = theScanner;
-        final TenantCache tenantCache = GlobalCache.getTenantCache(env, 
ScanUtil.getTenantId(scan));
-        try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
-            aggregators = ServerAggregators.deserialize(
-                    scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
conf, em);
-            rowAggregators = aggregators.getAggregators();
-            Pair<Integer, Integer> minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
-            Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " "+region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
-            }
-            boolean useIndexProto = true;
-            byte[] indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
-            // for backward compatiblity fall back to look by the old attribute
-            if (indexMaintainersPtr == null) {
-                indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
-                useIndexProto = false;
-            }
-    
-            if(needToWrite) {
-                synchronized (lock) {
-                    if (isRegionClosingOrSplitting) {
-                        throw new IOException("Temporarily unable to write 
from scan because region is closing or splitting");
-                    }
-                    scansReferenceCount++;
-                    incrScanRefCount = true;
-                    lock.notifyAll();
-                }
-            }
-            region.startRegionOperation();
-            acquiredLock = true;
-            synchronized (innerScanner) {
-                do {
-                    List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
-                    // Results are potentially returned even when the return 
value of s.next is false
-                    // since this is an indication of whether or not there are 
more values after the
-                    // ones returned
-                    hasMore = innerScanner.nextRaw(results);
-                    if (!results.isEmpty()) {
-                        rowCount++;
-                        result.setKeyValues(results);
-                        if (isDescRowKeyOrderUpgrade) {
-                            Arrays.fill(values, null);
-                            Cell firstKV = results.get(0);
-                            RowKeySchema schema = 
projectedTable.getRowKeySchema();
-                            int maxOffset = 
schema.iterator(firstKV.getRowArray(), firstKV.getRowOffset() + offset, 
firstKV.getRowLength(), ptr);
-                            for (int i = 0; i < schema.getFieldCount(); i++) {
-                                Boolean hasValue = schema.next(ptr, i, 
maxOffset);
-                                if (hasValue == null) {
-                                    break;
-                                }
-                                Field field = schema.getField(i);
-                                if (field.getSortOrder() == SortOrder.DESC) {
-                                    // Special case for re-writing DESC ARRAY, 
as the actual byte value needs to change in this case
-                                    if (field.getDataType().isArrayType()) {
-                                        field.getDataType().coerceBytes(ptr, 
null, field.getDataType(),
-                                            field.getMaxLength(), 
field.getScale(), field.getSortOrder(), 
-                                            field.getMaxLength(), 
field.getScale(), field.getSortOrder(), true); // force to use correct 
separator byte
-                                    }
-                                    // Special case for re-writing DESC CHAR 
or DESC BINARY, to force the re-writing of trailing space characters
-                                    else if (field.getDataType() == 
PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) {
-                                        int len = ptr.getLength();
-                                        while (len > 0 && 
ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
-                                            len--;
-                                        }
-                                        ptr.set(ptr.get(), ptr.getOffset(), 
len);
-                                        // Special case for re-writing DESC 
FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171)
-                                    } else if (field.getDataType() == 
PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) {
-                                        byte[] invertedBytes = 
SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength());
-                                        ptr.set(invertedBytes);
-                                    }
-                                } else if (field.getDataType() == 
PBinary.INSTANCE) {
-                                    // Remove trailing space characters so 
that the setValues call below will replace them
-                                    // with the correct zero byte character. 
Note this is somewhat dangerous as these
-                                    // could be legit, but I don't know what 
the alternative is.
-                                    int len = ptr.getLength();
-                                    while (len > 0 && 
ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
-                                        len--;
-                                    }
-                                    ptr.set(ptr.get(), ptr.getOffset(), len);  
                                      
-                                }
-                                values[i] = ptr.copyBytes();
-                            }
-                            writeToTable.newKey(ptr, values);
-                            if (Bytes.compareTo(
-                                firstKV.getRowArray(), firstKV.getRowOffset() 
+ offset, firstKV.getRowLength(), 
-                                ptr.get(),ptr.getOffset() + 
offset,ptr.getLength()) == 0) {
-                                continue;
-                            }
-                            byte[] newRow = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
-                            if (offset > 0) { // for local indexes (prepend 
region start key)
-                                byte[] newRowWithOffset = new byte[offset + 
newRow.length];
-                                System.arraycopy(firstKV.getRowArray(), 
firstKV.getRowOffset(), newRowWithOffset, 0, offset);;
-                                System.arraycopy(newRow, 0, newRowWithOffset, 
offset, newRow.length);
-                                newRow = newRowWithOffset;
-                            }
-                            byte[] oldRow = Bytes.copy(firstKV.getRowArray(), 
firstKV.getRowOffset(), firstKV.getRowLength());
-                            for (Cell cell : results) {
-                                // Copy existing cell but with new row key
-                                Cell newCell = new KeyValue(newRow, 0, 
newRow.length,
-                                    cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength(),
-                                    cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
-                                    cell.getTimestamp(), 
KeyValue.Type.codeToType(cell.getTypeByte()),
-                                    cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
-                                switch 
(KeyValue.Type.codeToType(cell.getTypeByte())) {
-                                case Put:
-                                    // If Put, point delete old Put
-                                    Delete del = new Delete(oldRow);
-                                    del.addDeleteMarker(new 
KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
-                                        cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength(),
-                                        cell.getQualifierArray(), 
cell.getQualifierOffset(),
-                                        cell.getQualifierLength(), 
cell.getTimestamp(), KeyValue.Type.Delete,
-                                        ByteUtil.EMPTY_BYTE_ARRAY, 0, 0));
-                                    mutations.add(del);
-
-                                    Put put = new Put(newRow);
-                                    put.add(newCell);
-                                    mutations.add(put);
-                                    break;
-                                case Delete:
-                                case DeleteColumn:
-                                case DeleteFamily:
-                                case DeleteFamilyVersion:
-                                    Delete delete = new Delete(newRow);
-                                    delete.addDeleteMarker(newCell);
-                                    mutations.add(delete);
-                                    break;
-                                }
-                            }
-                        } else if (buildLocalIndex) {
-                            for (IndexMaintainer maintainer : 
indexMaintainers) {
-                                if (!results.isEmpty()) {
-                                    result.getKey(ptr);
-                                    ValueGetter valueGetter =
-                                            
maintainer.createGetterFromKeyValues(
-                                                
ImmutableBytesPtr.copyBytesIfNecessary(ptr),
-                                                results);
-                                    Put put = 
maintainer.buildUpdateMutation(kvBuilder,
-                                        valueGetter, ptr, 
results.get(0).getTimestamp(),
-                                        
env.getRegion().getRegionInfo().getStartKey(),
-                                        
env.getRegion().getRegionInfo().getEndKey());
-
-                                    if (txnProvider != null) {
-                                        put = 
txnProvider.markPutAsCommitted(put, ts, ts);
-                                    }
-                                    indexMutations.add(put);
-                                }
-                            }
-                            result.setKeyValues(results);
-                        } else if (isDelete) {
-                            // FIXME: the version of the Delete constructor 
without the lock
-                            // args was introduced in 0.94.4, thus if we try 
to use it here
-                            // we can no longer use the 0.94.2 version of the 
client.
-                            Cell firstKV = results.get(0);
-                            Delete delete = new Delete(firstKV.getRowArray(),
-                                firstKV.getRowOffset(), 
firstKV.getRowLength(),ts);
-                            if (replayMutations != null) {
-                                delete.setAttribute(REPLAY_WRITES, 
replayMutations);
-                            }
-                            mutations.add(delete);
-                            // force tephra to ignore this deletes
-                            
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
-                        } else if (isUpsert) {
-                            Arrays.fill(values, null);
-                            int bucketNumOffset = 0;
-                            if (projectedTable.getBucketNum() != null) {
-                                values[0] = new byte[] { 0 };
-                                bucketNumOffset = 1;
-                            }
-                            int i = bucketNumOffset;
-                            List<PColumn> projectedColumns = 
projectedTable.getColumns();
-                            for (; i < projectedTable.getPKColumns().size(); 
i++) {
-                                Expression expression = 
selectExpressions.get(i - bucketNumOffset);
-                                if (expression.evaluate(result, ptr)) {
-                                    values[i] = ptr.copyBytes();
-                                    // If SortOrder from expression in SELECT 
doesn't match the
-                                    // column being projected into then invert 
the bits.
-                                    if (expression.getSortOrder() !=
-                                            
projectedColumns.get(i).getSortOrder()) {
-                                        SortOrder.invert(values[i], 0, 
values[i], 0,
-                                            values[i].length);
-                                    }
-                                }else{
-                                    values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
-                                }
-                            }
-                            projectedTable.newKey(ptr, values);
-                            PRow row = projectedTable.newRow(kvBuilder, ts, 
ptr, false);
-                            for (; i < projectedColumns.size(); i++) {
-                                Expression expression = 
selectExpressions.get(i - bucketNumOffset);
-                                if (expression.evaluate(result, ptr)) {
-                                    PColumn column = projectedColumns.get(i);
-                                    if 
(!column.getDataType().isSizeCompatible(ptr, null,
-                                        expression.getDataType(), 
expression.getSortOrder(),
-                                        expression.getMaxLength(), 
expression.getScale(),
-                                        column.getMaxLength(), 
column.getScale())) {
-                                        throw new DataExceedsCapacityException(
-                                                column.getDataType(),
-                                                column.getMaxLength(),
-                                                column.getScale(),
-                                                column.getName().getString());
-                                    }
-                                    column.getDataType().coerceBytes(ptr, null,
-                                        expression.getDataType(), 
expression.getMaxLength(),
-                                        expression.getScale(), 
expression.getSortOrder(), 
-                                        column.getMaxLength(), 
column.getScale(),
-                                        column.getSortOrder(), 
projectedTable.rowKeyOrderOptimizable());
-                                    byte[] bytes = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
-                                    row.setValue(column, bytes);
-                                }
-                            }
-                            for (Mutation mutation : row.toRowMutations()) {
-                                if (replayMutations != null) {
-                                    mutation.setAttribute(REPLAY_WRITES, 
replayMutations);
-                                } else if (txnProvider != null && 
projectedTable.getType() == PTableType.INDEX) {
-                                    mutation = 
txnProvider.markPutAsCommitted((Put)mutation, ts, ts);
-                                }
-                                mutations.add(mutation);
-                            }
-                            for (i = 0; i < selectExpressions.size(); i++) {
-                                selectExpressions.get(i).reset();
-                            }
-                        } else if (deleteCF != null && deleteCQ != null) {
-                            // No need to search for delete column, since we 
project only it
-                            // if no empty key value is being set
-                            if (emptyCF == null ||
-                                    result.getValue(deleteCF, deleteCQ) != 
null) {
-                                Delete delete = new 
Delete(results.get(0).getRowArray(),
-                                    results.get(0).getRowOffset(),
-                                    results.get(0).getRowLength());
-                                delete.deleteColumns(deleteCF,  deleteCQ, ts);
-                                // force tephra to ignore this deletes
-                                
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
-                                mutations.add(delete);
-                            }
-                        }
-                        if (emptyCF != null) {
-                            /*
-                             * If we've specified an emptyCF, then we need to 
insert an empty
-                             * key value "retroactively" for any key value 
that is visible at
-                             * the timestamp that the DDL was issued. Key 
values that are not
-                             * visible at this timestamp will not ever be 
projected up to
-                             * scans past this timestamp, so don't need to be 
considered.
-                             * We insert one empty key value per row per 
timestamp.
-                             */
-                            Set<Long> timeStamps =
-                                    
Sets.newHashSetWithExpectedSize(results.size());
-                            for (Cell kv : results) {
-                                long kvts = kv.getTimestamp();
-                                if (!timeStamps.contains(kvts)) {
-                                    Put put = new Put(kv.getRowArray(), 
kv.getRowOffset(),
-                                        kv.getRowLength());
-                                    put.addColumn(emptyCF, 
QueryConstants.EMPTY_COLUMN_BYTES, kvts,
-                                        ByteUtil.EMPTY_BYTE_ARRAY);
-                                    mutations.add(put);
-                                }
-                            }
-                        }
-                        if (ServerUtil.readyToCommit(mutations.size(), 
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            commit(region, mutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr,
-                                txState, targetHTable, useIndexProto, 
isPKChanging, clientVersionBytes);
-                            mutations.clear();
-                        }
-                        // Commit in batches based on 
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
-
-                        if (ServerUtil.readyToCommit(indexMutations.size(), 
indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            setIndexAndTransactionProperties(indexMutations, 
indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
-                            commitBatch(region, indexMutations, 
blockingMemStoreSize);
-                            indexMutations.clear();
-                        }
-                        aggregators.aggregate(rowAggregators, result);
-                        hasAny = true;
-                    }
-                } while (hasMore);
-                if (!mutations.isEmpty()) {
-                    commit(region, mutations, indexUUID, blockingMemStoreSize, 
indexMaintainersPtr, txState,
-                        targetHTable, useIndexProto, isPKChanging, 
clientVersionBytes);
-                    mutations.clear();
-                }
-
-                if (!indexMutations.isEmpty()) {
-                    commitBatch(region, indexMutations, blockingMemStoreSize);
-                    indexMutations.clear();
-                }
-            }
-        } finally {
-            if (needToWrite && incrScanRefCount) {
-                synchronized (lock) {
-                    scansReferenceCount--;
-                    if (scansReferenceCount < 0) {
-                        LOGGER.warn(
-                            "Scan reference count went below zero. Something 
isn't correct. Resetting it back to zero");
-                        scansReferenceCount = 0;
-                    }
-                    lock.notifyAll();
-                }
-            }
-            try {
-                if (targetHTable != null) {
-                    try {
-                        targetHTable.close();
-                    } catch (IOException e) {
-                        LOGGER.error("Closing table: " + targetHTable + " 
failed: ", e);
-                    }
-                }
-            } finally {
-                try {
-                    innerScanner.close();
-                } finally {
-                    if (acquiredLock) region.closeRegionOperation();
-                }
-            }
-        }
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(LogUtil.addCustomAnnotations("Finished scanning " + 
rowCount + " rows for ungrouped coprocessor scan " + scan, 
ScanUtil.getCustomAnnotations(scan)));
+                    getWrappedScanner(c, theScanner, offset, scan, 
dataColumns, tupleProjector,
+                            region, indexMaintainers == null ? null : 
indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
         }
 
-        final boolean hadAny = hasAny;
-        KeyValue keyValue = null;
-        if (hadAny) {
-            byte[] value = aggregators.toBytes(rowAggregators);
-            keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+        if (j != null)  {
+            theScanner = new HashJoinRegionScanner(theScanner, p, j, 
ScanUtil.getTenantId(scan), env, useQualifierAsIndex, 
useNewValueColumnQualifier);
         }
-        final KeyValue aggKeyValue = keyValue;
-
-        RegionScanner scanner = new BaseRegionScanner(innerScanner) {
-            private boolean done = !hadAny;
-
-            @Override
-            public boolean isFilterDone() {
-                return done;
-            }
-
-            @Override
-            public boolean next(List<Cell> results) throws IOException {
-                if (done) return false;
-                done = true;
-                results.add(aggKeyValue);
-                return false;
-            }
-
-            @Override
-            public long getMaxResultSize() {
-                return scan.getMaxResultSize();
-            }
-        };
+        RegionScanner scanner = new UngroupedAggregateRegionScanner(c, 
theScanner,region, scan, env, this);

Review comment:
       nit: You can just `return` here instead of using the new variable 
`scanner`

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
##########
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.REPLAY_WRITES;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.checkForLocalIndexColumnFamilies;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeExpressions;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeTable;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.getBlockingMemstoreSize;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.setIndexAndTransactionProperties;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.memory.InsufficientMemoryException;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueSchema;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBinary;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PFloat;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.util.ExpressionUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(UngroupedAggregateRegionScanner.class);
+
+    protected long pageSizeInRows = Long.MAX_VALUE;
+    protected int maxBatchSize = 0;
+    protected Scan scan;
+    protected RegionScanner innerScanner;
+    protected Region region;
+    private final UngroupedAggregateRegionObserver 
ungroupedAggregateRegionObserver;
+    final private RegionCoprocessorEnvironment env;
+    private final boolean useQualifierAsIndex;
+    boolean needToWrite = false;
+    final Pair<Integer, Integer> minMaxQualifiers;
+    byte[][] values = null;
+    PTable.QualifierEncodingScheme encodingScheme;
+    PTable writeToTable = null;
+    PTable projectedTable = null;
+    boolean isDescRowKeyOrderUpgrade;
+    final int offset;
+    boolean buildLocalIndex;
+    List<IndexMaintainer> indexMaintainers;
+    boolean isPKChanging = false;
+    long ts;
+    PhoenixTransactionProvider txnProvider = null;
+    UngroupedAggregateRegionObserver.MutationList indexMutations;
+    boolean isDelete = false;
+    byte[] replayMutations;
+    boolean isUpsert = false;
+    List<Expression> selectExpressions = null;
+    byte[] deleteCQ = null;
+    byte[] deleteCF = null;
+    byte[] emptyCF = null;
+    byte[] indexUUID;
+    byte[] txState;
+    byte[] clientVersionBytes;
+    long blockingMemStoreSize;
+    long maxBatchSizeBytes = 0L;
+    HTable targetHTable = null;
+    boolean incrScanRefCount = false;
+    byte[] indexMaintainersPtr;
+    boolean useIndexProto;
+
+    public UngroupedAggregateRegionScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+                                           final RegionScanner innerScanner, 
final Region region, final Scan scan,
+                                           final RegionCoprocessorEnvironment 
env,
+                                           final 
UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException, SQLException{
+        super(innerScanner);
+        this.env = env;
+        this.region = region;
+        this.scan = scan;
+        this.ungroupedAggregateRegionObserver = 
ungroupedAggregateRegionObserver;
+        this.innerScanner = innerScanner;
+        Configuration conf = env.getConfiguration();
+        if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != 
null) {
+            byte[] pageSizeFromScan =
+                    
scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_ROWS);
+            if (pageSizeFromScan != null) {
+                pageSizeInRows = Bytes.toLong(pageSizeFromScan);
+            } else {
+                pageSizeInRows =
+                        conf.getLong(UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS,
+                                
QueryServicesOptions.DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS);
+            }
+        }
+        ts = scan.getTimeRange().getMax();
+        boolean localIndexScan = ScanUtil.isLocalIndex(scan);
+
+        encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+        int offsetToBe = 0;
+        if (localIndexScan) {
+            /*
+             * For local indexes, we need to set an offset on row key 
expressions to skip
+             * the region start key.
+             */
+            offsetToBe = region.getRegionInfo().getStartKey().length != 0 ? 
region.getRegionInfo().getStartKey().length :
+                    region.getRegionInfo().getEndKey().length;
+            ScanUtil.setRowKeyOffset(scan, offsetToBe);
+        }
+        offset = offsetToBe;
+
+        byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY);
+        isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null;
+        if (isDescRowKeyOrderUpgrade) {
+            LOGGER.debug("Upgrading row key for " + 
region.getRegionInfo().getTable().getNameAsString());
+            projectedTable = deserializeTable(descRowKeyTableBytes);
+            try {
+                writeToTable = PTableImpl.builderWithColumns(projectedTable,
+                        getColumnsToClone(projectedTable))
+                        .setRowKeyOrderOptimizable(true)
+                        .build();
+            } catch (SQLException e) {
+                ServerUtil.throwIOException("Upgrade failed", e); // Impossible
+            }
+            values = new byte[projectedTable.getPKColumns().size()][];
+        }
+        boolean useProto = false;
+        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+        useProto = localIndexBytes != null;
+        if (localIndexBytes == null) {
+            localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+        }
+        indexMaintainers = localIndexBytes == null ? null : 
IndexMaintainer.deserialize(localIndexBytes, useProto);
+        indexMutations = localIndexBytes == null ? new 
UngroupedAggregateRegionObserver.MutationList() : new 
UngroupedAggregateRegionObserver.MutationList(1024);
+
+        replayMutations = scan.getAttribute(REPLAY_WRITES);
+        indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+        txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+        clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        if (txState != null) {
+            int clientVersion = clientVersionBytes == null ? 
ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+            txnProvider = TransactionFactory.getTransactionProvider(txState, 
clientVersion);
+        }
+        byte[] upsertSelectTable = 
scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
+        if (upsertSelectTable != null) {
+            isUpsert = true;
+            projectedTable = deserializeTable(upsertSelectTable);
+            targetHTable = new 
HTable(ungroupedAggregateRegionObserver.getUpsertSelectConfig(),
+                    projectedTable.getPhysicalName().getBytes());
+            selectExpressions = 
deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
+            values = new byte[projectedTable.getPKColumns().size()][];
+            isPKChanging = ExpressionUtil.isPkPositionChanging(new 
TableRef(projectedTable), selectExpressions);
+        } else {
+            byte[] isDeleteAgg = 
scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
+            isDelete = isDeleteAgg != null && 
Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+            if (!isDelete) {
+                deleteCF = 
scan.getAttribute(BaseScannerRegionObserver.DELETE_CF);
+                deleteCQ = 
scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
+            }
+            emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
+        }
+        ColumnReference[] dataColumns = 
IndexUtil.deserializeDataTableColumnsToJoin(scan);
+        useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
+
+        /**
+         * Slow down the writes if the memstore size more than
+         * (hbase.hregion.memstore.block.multiplier - 1) times 
hbase.hregion.memstore.flush.size
+         * bytes. This avoids flush storm to hdfs for cases like index 
building where reads and
+         * write happen to all the table regions in the server.
+         */
+        blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ;
+
+        buildLocalIndex = indexMaintainers != null && dataColumns==null && 
!localIndexScan;
+        if(buildLocalIndex) {
+            checkForLocalIndexColumnFamilies(region, indexMaintainers);
+        }
+        if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
+                || (deleteCQ != null && deleteCF != null) || emptyCF != null 
|| buildLocalIndex) {
+            needToWrite = true;
+            if((isUpsert && (targetHTable == null ||
+                    
!targetHTable.getName().equals(region.getTableDesc().getTableName())))) {
+                needToWrite = false;
+            }
+            maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+        }
+        minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " " + region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
+        }
+        useIndexProto = true;
+        indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+        // for backward compatiblity fall back to look by the old attribute
+        if (indexMaintainersPtr == null) {
+            indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+            useIndexProto = false;
+        }
+
+        if (needToWrite) {
+            ungroupedAggregateRegionObserver.incrementScansReferenceCount();
+            incrScanRefCount = true;
+        }
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (needToWrite && incrScanRefCount) {
+            ungroupedAggregateRegionObserver.decrementScansReferenceCount();
+        }
+        try {
+            if (targetHTable != null) {
+                try {
+                    targetHTable.close();
+                } catch (IOException e) {
+                    LOGGER.error("Closing table: " + targetHTable + " failed: 
", e);
+                }
+            }
+        } finally {
+            innerScanner.close();
+        }
+    }
+
+    boolean descRowKeyOrderUpgrade(List<Cell> results, ImmutableBytesWritable 
ptr,
+                                UngroupedAggregateRegionObserver.MutationList 
mutations) throws IOException {
+        Arrays.fill(values, null);
+        Cell firstKV = results.get(0);
+        RowKeySchema schema = projectedTable.getRowKeySchema();
+        int maxOffset = schema.iterator(firstKV.getRowArray(), 
firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr);
+        for (int i = 0; i < schema.getFieldCount(); i++) {
+            Boolean hasValue = schema.next(ptr, i, maxOffset);
+            if (hasValue == null) {
+                break;
+            }
+            ValueSchema.Field field = schema.getField(i);
+            if (field.getSortOrder() == SortOrder.DESC) {
+                // Special case for re-writing DESC ARRAY, as the actual byte 
value needs to change in this case
+                if (field.getDataType().isArrayType()) {
+                    field.getDataType().coerceBytes(ptr, null, 
field.getDataType(),
+                            field.getMaxLength(), field.getScale(), 
field.getSortOrder(),
+                            field.getMaxLength(), field.getScale(), 
field.getSortOrder(), true); // force to use correct separator byte
+                }
+                // Special case for re-writing DESC CHAR or DESC BINARY, to 
force the re-writing of trailing space characters
+                else if (field.getDataType() == PChar.INSTANCE || 
field.getDataType() == PBinary.INSTANCE) {
+                    int len = ptr.getLength();
+                    while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == 
StringUtil.SPACE_UTF8) {
+                        len--;
+                    }
+                    ptr.set(ptr.get(), ptr.getOffset(), len);
+                    // Special case for re-writing DESC FLOAT and DOUBLE, as 
they're not inverted like they should be (PHOENIX-2171)
+                } else if (field.getDataType() == PFloat.INSTANCE || 
field.getDataType() == PDouble.INSTANCE) {
+                    byte[] invertedBytes = SortOrder.invert(ptr.get(), 
ptr.getOffset(), ptr.getLength());
+                    ptr.set(invertedBytes);
+                }
+            } else if (field.getDataType() == PBinary.INSTANCE) {
+                // Remove trailing space characters so that the setValues call 
below will replace them
+                // with the correct zero byte character. Note this is somewhat 
dangerous as these
+                // could be legit, but I don't know what the alternative is.
+                int len = ptr.getLength();
+                while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == 
StringUtil.SPACE_UTF8) {
+                    len--;
+                }
+                ptr.set(ptr.get(), ptr.getOffset(), len);
+            }
+            values[i] = ptr.copyBytes();
+        }
+        writeToTable.newKey(ptr, values);
+        if (Bytes.compareTo(
+                firstKV.getRowArray(), firstKV.getRowOffset() + offset, 
firstKV.getRowLength(),
+                ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) {
+            return false;
+        }
+        byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr);
+        if (offset > 0) { // for local indexes (prepend region start key)
+            byte[] newRowWithOffset = new byte[offset + newRow.length];
+            System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), 
newRowWithOffset, 0, offset);;
+            System.arraycopy(newRow, 0, newRowWithOffset, offset, 
newRow.length);
+            newRow = newRowWithOffset;
+        }
+        byte[] oldRow = Bytes.copy(firstKV.getRowArray(), 
firstKV.getRowOffset(), firstKV.getRowLength());
+        for (Cell cell : results) {
+            // Copy existing cell but with new row key
+            Cell newCell = new KeyValue(newRow, 0, newRow.length,
+                    cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(),
+                    cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength(),
+                    cell.getTimestamp(), 
KeyValue.Type.codeToType(cell.getTypeByte()),
+                    cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+            switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
+                case Put:
+                    // If Put, point delete old Put
+                    Delete del = new Delete(oldRow);
+                    del.addDeleteMarker(new KeyValue(cell.getRowArray(), 
cell.getRowOffset(), cell.getRowLength(),
+                            cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(),
+                            cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                            cell.getQualifierLength(), cell.getTimestamp(), 
KeyValue.Type.Delete,
+                            ByteUtil.EMPTY_BYTE_ARRAY, 0, 0));
+                    mutations.add(del);
+
+                    Put put = new Put(newRow);
+                    put.add(newCell);
+                    mutations.add(put);
+                    break;
+                case Delete:
+                case DeleteColumn:
+                case DeleteFamily:
+                case DeleteFamilyVersion:
+                    Delete delete = new Delete(newRow);
+                    delete.addDeleteMarker(newCell);
+                    mutations.add(delete);
+                    break;
+            }
+        }
+        return true;
+    }
+
+    void buildLocalIndex(Tuple result, List<Cell> results, 
ImmutableBytesWritable ptr,
+                         UngroupedAggregateRegionObserver.MutationList 
mutations) throws IOException {
+        for (IndexMaintainer maintainer : indexMaintainers) {
+            if (!results.isEmpty()) {
+                result.getKey(ptr);
+                ValueGetter valueGetter =
+                        maintainer.createGetterFromKeyValues(
+                                ImmutableBytesPtr.copyBytesIfNecessary(ptr),
+                                results);
+                Put put = 
maintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                        valueGetter, ptr, results.get(0).getTimestamp(),
+                        env.getRegion().getRegionInfo().getStartKey(),
+                        env.getRegion().getRegionInfo().getEndKey());
+
+                if (txnProvider != null) {
+                    put = txnProvider.markPutAsCommitted(put, ts, ts);
+                }
+                indexMutations.add(put);
+            }
+        }
+        result.setKeyValues(results);
+    }
+    void deleteRow(List<Cell> results, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        // FIXME: the version of the Delete constructor without the lock
+        // args was introduced in 0.94.4, thus if we try to use it here
+        // we can no longer use the 0.94.2 version of the client.
+        Cell firstKV = results.get(0);
+        Delete delete = new Delete(firstKV.getRowArray(),
+                firstKV.getRowOffset(), firstKV.getRowLength(),ts);
+        if (replayMutations != null) {
+            delete.setAttribute(REPLAY_WRITES, replayMutations);
+        }
+        mutations.add(delete);
+        // force tephra to ignore this deletes
+        
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+    }
+
+    void deleteCForQ(Tuple result, List<Cell> results, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        // No need to search for delete column, since we project only it
+        // if no empty key value is being set
+        if (emptyCF == null ||
+                result.getValue(deleteCF, deleteCQ) != null) {
+            Delete delete = new Delete(results.get(0).getRowArray(),
+                    results.get(0).getRowOffset(),
+                    results.get(0).getRowLength());
+            delete.deleteColumns(deleteCF,  deleteCQ, ts);
+            // force tephra to ignore this deletes
+            
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+            mutations.add(delete);
+        }
+    }
+    void upsert(Tuple result, ImmutableBytesWritable ptr, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        Arrays.fill(values, null);
+        int bucketNumOffset = 0;
+        if (projectedTable.getBucketNum() != null) {
+            values[0] = new byte[] { 0 };
+            bucketNumOffset = 1;
+        }
+        int i = bucketNumOffset;
+        List<PColumn> projectedColumns = projectedTable.getColumns();
+        for (; i < projectedTable.getPKColumns().size(); i++) {
+            Expression expression = selectExpressions.get(i - bucketNumOffset);
+            if (expression.evaluate(result, ptr)) {
+                values[i] = ptr.copyBytes();
+                // If SortOrder from expression in SELECT doesn't match the
+                // column being projected into then invert the bits.
+                if (expression.getSortOrder() !=
+                        projectedColumns.get(i).getSortOrder()) {
+                    SortOrder.invert(values[i], 0, values[i], 0,
+                            values[i].length);
+                }
+            }else{
+                values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
+            }
+        }
+        projectedTable.newKey(ptr, values);
+        PRow row = projectedTable.newRow(GenericKeyValueBuilder.INSTANCE, ts, 
ptr, false);
+        for (; i < projectedColumns.size(); i++) {
+            Expression expression = selectExpressions.get(i - bucketNumOffset);
+            if (expression.evaluate(result, ptr)) {
+                PColumn column = projectedColumns.get(i);
+                if (!column.getDataType().isSizeCompatible(ptr, null,
+                        expression.getDataType(), expression.getSortOrder(),
+                        expression.getMaxLength(), expression.getScale(),
+                        column.getMaxLength(), column.getScale())) {
+                    throw new DataExceedsCapacityException(
+                            column.getDataType(), column.getMaxLength(),
+                            column.getScale(), column.getName().getString(), 
ptr);
+                }
+                column.getDataType().coerceBytes(ptr, null,
+                        expression.getDataType(), expression.getMaxLength(),
+                        expression.getScale(), expression.getSortOrder(),
+                        column.getMaxLength(), column.getScale(),
+                        column.getSortOrder(), 
projectedTable.rowKeyOrderOptimizable());
+                byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                row.setValue(column, bytes);
+            }
+        }
+        for (Mutation mutation : row.toRowMutations()) {
+            if (replayMutations != null) {
+                mutation.setAttribute(REPLAY_WRITES, replayMutations);
+            } else if (txnProvider != null && projectedTable.getType() == 
PTableType.INDEX) {
+                mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, 
ts);
+            }
+            mutations.add(mutation);
+        }
+        for (i = 0; i < selectExpressions.size(); i++) {
+            selectExpressions.get(i).reset();
+        }
+    }
+
+    void insertEmptyKeyValue(List<Cell> results, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        Set<Long> timeStamps =
+                Sets.newHashSetWithExpectedSize(results.size());
+        for (Cell kv : results) {
+            long kvts = kv.getTimestamp();
+            if (!timeStamps.contains(kvts)) {
+                Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
+                        kv.getRowLength());
+                put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
+                        ByteUtil.EMPTY_BYTE_ARRAY);
+                mutations.add(put);
+            }
+        }
+    }
+    @Override
+    public boolean next(List<Cell> resultsToReturn) throws IOException {
+        boolean hasMore;
+        Configuration conf = env.getConfiguration();
+        final TenantCache tenantCache = GlobalCache.getTenantCache(env, 
ScanUtil.getTenantId(scan));
+        try (MemoryManager.MemoryChunk em = 
tenantCache.getMemoryManager().allocate(0)) {
+            Aggregators aggregators = ServerAggregators.deserialize(
+                    scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
conf, em);
+            Aggregator[] rowAggregators = aggregators.getAggregators();
+            aggregators.reset(rowAggregators);
+            Cell lastCell = null;
+            int rowCount = 0;
+            boolean hasAny = false;
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+            UngroupedAggregateRegionObserver.MutationList mutations = new 
UngroupedAggregateRegionObserver.MutationList();
+            if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
+                    || (deleteCQ != null && deleteCF != null) || emptyCF != 
null || buildLocalIndex) {
+                mutations = new 
UngroupedAggregateRegionObserver.MutationList(Ints.saturatedCast(maxBatchSize + 
maxBatchSize / 10));
+            }
+            region.startRegionOperation();
+            try {
+                synchronized (innerScanner) {
+                    do {
+                        
ungroupedAggregateRegionObserver.checkForRegionClosing();
+                        List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
+                        // Results are potentially returned even when the 
return value of s.next is false
+                        // since this is an indication of whether or not there 
are more values after the
+                        // ones returned
+                        hasMore = innerScanner.nextRaw(results);
+                        if (!results.isEmpty()) {
+                            lastCell = results.get(0);
+                            rowCount++;
+                            result.setKeyValues(results);
+                            if (isDescRowKeyOrderUpgrade) {
+                                if (!descRowKeyOrderUpgrade(results, ptr, 
mutations)) {
+                                    continue;
+                                }
+                            } else if (buildLocalIndex) {
+                                buildLocalIndex(result, results, ptr, 
mutations);
+                            } else if (isDelete) {
+                                deleteRow(results, mutations);
+                            } else if (isUpsert) {
+                                upsert(result, ptr, mutations);
+                            } else if (deleteCF != null && deleteCQ != null) {
+                                deleteCForQ(result, results, mutations);
+                            }
+                            if (emptyCF != null) {
+                                /*
+                                 * If we've specified an emptyCF, then we need 
to insert an empty
+                                 * key value "retroactively" for any key value 
that is visible at
+                                 * the timestamp that the DDL was issued. Key 
values that are not
+                                 * visible at this timestamp will not ever be 
projected up to
+                                 * scans past this timestamp, so don't need to 
be considered.
+                                 * We insert one empty key value per row per 
timestamp.
+                                 */
+                                insertEmptyKeyValue(results, mutations);
+                            }
+                            if (ServerUtil.readyToCommit(mutations.size(), 
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+                                
ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr,
+                                        txState, targetHTable, useIndexProto, 
isPKChanging, clientVersionBytes);
+                                mutations.clear();
+                            }
+                            // Commit in batches based on 
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+
+                            if 
(ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), 
maxBatchSize, maxBatchSizeBytes)) {
+                                
setIndexAndTransactionProperties(indexMutations, indexUUID, 
indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
+                                
ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, 
blockingMemStoreSize);
+                                indexMutations.clear();
+                            }
+                            aggregators.aggregate(rowAggregators, result);
+                            hasAny = true;
+                        }
+                    } while (hasMore && rowCount < pageSizeInRows);

Review comment:
       Based on the comment for `UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS ` in 
`QueryServices`, shouldn't this be a ` <= pageSizeInRows` check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to