ChinmaySKulkarni commented on a change in pull request #936:
URL: https://github.com/apache/phoenix/pull/936#discussion_r520211914



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
##########
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.REPLAY_WRITES;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.checkForLocalIndexColumnFamilies;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeExpressions;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeTable;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.getBlockingMemstoreSize;
+import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.setIndexAndTransactionProperties;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.memory.InsufficientMemoryException;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueSchema;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBinary;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PFloat;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.util.ExpressionUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(UngroupedAggregateRegionScanner.class);
+
+    protected long pageSizeInRows = Long.MAX_VALUE;
+    protected int maxBatchSize = 0;
+    protected Scan scan;
+    protected RegionScanner innerScanner;
+    protected Region region;
+    private final UngroupedAggregateRegionObserver 
ungroupedAggregateRegionObserver;
+    final private RegionCoprocessorEnvironment env;
+    private final boolean useQualifierAsIndex;
+    boolean needToWrite = false;
+    final Pair<Integer, Integer> minMaxQualifiers;
+    byte[][] values = null;
+    PTable.QualifierEncodingScheme encodingScheme;
+    PTable writeToTable = null;
+    PTable projectedTable = null;
+    boolean isDescRowKeyOrderUpgrade;
+    final int offset;
+    boolean buildLocalIndex;
+    List<IndexMaintainer> indexMaintainers;
+    boolean isPKChanging = false;
+    long ts;
+    PhoenixTransactionProvider txnProvider = null;
+    UngroupedAggregateRegionObserver.MutationList indexMutations;
+    boolean isDelete = false;
+    byte[] replayMutations;
+    boolean isUpsert = false;
+    List<Expression> selectExpressions = null;
+    byte[] deleteCQ = null;
+    byte[] deleteCF = null;
+    byte[] emptyCF = null;
+    byte[] indexUUID;
+    byte[] txState;
+    byte[] clientVersionBytes;
+    long blockingMemStoreSize;
+    long maxBatchSizeBytes = 0L;
+    HTable targetHTable = null;
+    boolean incrScanRefCount = false;
+    byte[] indexMaintainersPtr;
+    boolean useIndexProto;
+
+    public UngroupedAggregateRegionScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+                                           final RegionScanner innerScanner, 
final Region region, final Scan scan,
+                                           final RegionCoprocessorEnvironment 
env,
+                                           final 
UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException, SQLException{
+        super(innerScanner);
+        this.env = env;
+        this.region = region;
+        this.scan = scan;
+        this.ungroupedAggregateRegionObserver = 
ungroupedAggregateRegionObserver;
+        this.innerScanner = innerScanner;
+        Configuration conf = env.getConfiguration();
+        if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != 
null) {
+            byte[] pageSizeFromScan =
+                    
scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_ROWS);
+            if (pageSizeFromScan != null) {
+                pageSizeInRows = Bytes.toLong(pageSizeFromScan);
+            } else {
+                pageSizeInRows =
+                        conf.getLong(UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS,
+                                
QueryServicesOptions.DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_ROWS);
+            }
+        }
+        ts = scan.getTimeRange().getMax();
+        boolean localIndexScan = ScanUtil.isLocalIndex(scan);
+
+        encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+        int offsetToBe = 0;
+        if (localIndexScan) {
+            /*
+             * For local indexes, we need to set an offset on row key 
expressions to skip
+             * the region start key.
+             */
+            offsetToBe = region.getRegionInfo().getStartKey().length != 0 ? 
region.getRegionInfo().getStartKey().length :
+                    region.getRegionInfo().getEndKey().length;
+            ScanUtil.setRowKeyOffset(scan, offsetToBe);
+        }
+        offset = offsetToBe;
+
+        byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY);
+        isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null;
+        if (isDescRowKeyOrderUpgrade) {
+            LOGGER.debug("Upgrading row key for " + 
region.getRegionInfo().getTable().getNameAsString());
+            projectedTable = deserializeTable(descRowKeyTableBytes);
+            try {
+                writeToTable = PTableImpl.builderWithColumns(projectedTable,
+                        getColumnsToClone(projectedTable))
+                        .setRowKeyOrderOptimizable(true)
+                        .build();
+            } catch (SQLException e) {
+                ServerUtil.throwIOException("Upgrade failed", e); // Impossible
+            }
+            values = new byte[projectedTable.getPKColumns().size()][];
+        }
+        boolean useProto = false;
+        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+        useProto = localIndexBytes != null;
+        if (localIndexBytes == null) {
+            localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+        }
+        indexMaintainers = localIndexBytes == null ? null : 
IndexMaintainer.deserialize(localIndexBytes, useProto);
+        indexMutations = localIndexBytes == null ? new 
UngroupedAggregateRegionObserver.MutationList() : new 
UngroupedAggregateRegionObserver.MutationList(1024);
+
+        replayMutations = scan.getAttribute(REPLAY_WRITES);
+        indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+        txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+        clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        if (txState != null) {
+            int clientVersion = clientVersionBytes == null ? 
ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+            txnProvider = TransactionFactory.getTransactionProvider(txState, 
clientVersion);
+        }
+        byte[] upsertSelectTable = 
scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
+        if (upsertSelectTable != null) {
+            isUpsert = true;
+            projectedTable = deserializeTable(upsertSelectTable);
+            targetHTable = new 
HTable(ungroupedAggregateRegionObserver.getUpsertSelectConfig(),
+                    projectedTable.getPhysicalName().getBytes());
+            selectExpressions = 
deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
+            values = new byte[projectedTable.getPKColumns().size()][];
+            isPKChanging = ExpressionUtil.isPkPositionChanging(new 
TableRef(projectedTable), selectExpressions);
+        } else {
+            byte[] isDeleteAgg = 
scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
+            isDelete = isDeleteAgg != null && 
Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+            if (!isDelete) {
+                deleteCF = 
scan.getAttribute(BaseScannerRegionObserver.DELETE_CF);
+                deleteCQ = 
scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
+            }
+            emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
+        }
+        ColumnReference[] dataColumns = 
IndexUtil.deserializeDataTableColumnsToJoin(scan);
+        useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
+
+        /**
+         * Slow down the writes if the memstore size more than
+         * (hbase.hregion.memstore.block.multiplier - 1) times 
hbase.hregion.memstore.flush.size
+         * bytes. This avoids flush storm to hdfs for cases like index 
building where reads and
+         * write happen to all the table regions in the server.
+         */
+        blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ;
+
+        buildLocalIndex = indexMaintainers != null && dataColumns==null && 
!localIndexScan;
+        if(buildLocalIndex) {
+            checkForLocalIndexColumnFamilies(region, indexMaintainers);
+        }
+        if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
+                || (deleteCQ != null && deleteCF != null) || emptyCF != null 
|| buildLocalIndex) {
+            needToWrite = true;
+            if((isUpsert && (targetHTable == null ||
+                    
!targetHTable.getName().equals(region.getTableDesc().getTableName())))) {
+                needToWrite = false;
+            }
+            maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+        }
+        minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " " + region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
+        }
+        useIndexProto = true;
+        indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+        // for backward compatiblity fall back to look by the old attribute
+        if (indexMaintainersPtr == null) {
+            indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+            useIndexProto = false;
+        }
+
+        if (needToWrite) {
+            ungroupedAggregateRegionObserver.incrementScansReferenceCount();
+            incrScanRefCount = true;
+        }
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (needToWrite && incrScanRefCount) {
+            ungroupedAggregateRegionObserver.decrementScansReferenceCount();
+        }
+        try {
+            if (targetHTable != null) {
+                try {
+                    targetHTable.close();
+                } catch (IOException e) {
+                    LOGGER.error("Closing table: " + targetHTable + " failed: 
", e);
+                }
+            }
+        } finally {
+            innerScanner.close();
+        }
+    }
+
+    boolean descRowKeyOrderUpgrade(List<Cell> results, ImmutableBytesWritable 
ptr,
+                                UngroupedAggregateRegionObserver.MutationList 
mutations) throws IOException {
+        Arrays.fill(values, null);
+        Cell firstKV = results.get(0);
+        RowKeySchema schema = projectedTable.getRowKeySchema();
+        int maxOffset = schema.iterator(firstKV.getRowArray(), 
firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr);
+        for (int i = 0; i < schema.getFieldCount(); i++) {
+            Boolean hasValue = schema.next(ptr, i, maxOffset);
+            if (hasValue == null) {
+                break;
+            }
+            ValueSchema.Field field = schema.getField(i);
+            if (field.getSortOrder() == SortOrder.DESC) {
+                // Special case for re-writing DESC ARRAY, as the actual byte 
value needs to change in this case
+                if (field.getDataType().isArrayType()) {
+                    field.getDataType().coerceBytes(ptr, null, 
field.getDataType(),
+                            field.getMaxLength(), field.getScale(), 
field.getSortOrder(),
+                            field.getMaxLength(), field.getScale(), 
field.getSortOrder(), true); // force to use correct separator byte
+                }
+                // Special case for re-writing DESC CHAR or DESC BINARY, to 
force the re-writing of trailing space characters
+                else if (field.getDataType() == PChar.INSTANCE || 
field.getDataType() == PBinary.INSTANCE) {
+                    int len = ptr.getLength();
+                    while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == 
StringUtil.SPACE_UTF8) {
+                        len--;
+                    }
+                    ptr.set(ptr.get(), ptr.getOffset(), len);
+                    // Special case for re-writing DESC FLOAT and DOUBLE, as 
they're not inverted like they should be (PHOENIX-2171)
+                } else if (field.getDataType() == PFloat.INSTANCE || 
field.getDataType() == PDouble.INSTANCE) {
+                    byte[] invertedBytes = SortOrder.invert(ptr.get(), 
ptr.getOffset(), ptr.getLength());
+                    ptr.set(invertedBytes);
+                }
+            } else if (field.getDataType() == PBinary.INSTANCE) {
+                // Remove trailing space characters so that the setValues call 
below will replace them
+                // with the correct zero byte character. Note this is somewhat 
dangerous as these
+                // could be legit, but I don't know what the alternative is.
+                int len = ptr.getLength();
+                while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == 
StringUtil.SPACE_UTF8) {
+                    len--;
+                }
+                ptr.set(ptr.get(), ptr.getOffset(), len);
+            }
+            values[i] = ptr.copyBytes();
+        }
+        writeToTable.newKey(ptr, values);
+        if (Bytes.compareTo(
+                firstKV.getRowArray(), firstKV.getRowOffset() + offset, 
firstKV.getRowLength(),
+                ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) {
+            return false;
+        }
+        byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr);
+        if (offset > 0) { // for local indexes (prepend region start key)
+            byte[] newRowWithOffset = new byte[offset + newRow.length];
+            System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), 
newRowWithOffset, 0, offset);;
+            System.arraycopy(newRow, 0, newRowWithOffset, offset, 
newRow.length);
+            newRow = newRowWithOffset;
+        }
+        byte[] oldRow = Bytes.copy(firstKV.getRowArray(), 
firstKV.getRowOffset(), firstKV.getRowLength());
+        for (Cell cell : results) {
+            // Copy existing cell but with new row key
+            Cell newCell = new KeyValue(newRow, 0, newRow.length,
+                    cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(),
+                    cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength(),
+                    cell.getTimestamp(), 
KeyValue.Type.codeToType(cell.getTypeByte()),
+                    cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+            switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
+                case Put:
+                    // If Put, point delete old Put
+                    Delete del = new Delete(oldRow);
+                    del.addDeleteMarker(new KeyValue(cell.getRowArray(), 
cell.getRowOffset(), cell.getRowLength(),
+                            cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(),
+                            cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                            cell.getQualifierLength(), cell.getTimestamp(), 
KeyValue.Type.Delete,
+                            ByteUtil.EMPTY_BYTE_ARRAY, 0, 0));
+                    mutations.add(del);
+
+                    Put put = new Put(newRow);
+                    put.add(newCell);
+                    mutations.add(put);
+                    break;
+                case Delete:
+                case DeleteColumn:
+                case DeleteFamily:
+                case DeleteFamilyVersion:
+                    Delete delete = new Delete(newRow);
+                    delete.addDeleteMarker(newCell);
+                    mutations.add(delete);
+                    break;
+            }
+        }
+        return true;
+    }
+
+    void buildLocalIndex(Tuple result, List<Cell> results, 
ImmutableBytesWritable ptr,
+                         UngroupedAggregateRegionObserver.MutationList 
mutations) throws IOException {
+        for (IndexMaintainer maintainer : indexMaintainers) {
+            if (!results.isEmpty()) {
+                result.getKey(ptr);
+                ValueGetter valueGetter =
+                        maintainer.createGetterFromKeyValues(
+                                ImmutableBytesPtr.copyBytesIfNecessary(ptr),
+                                results);
+                Put put = 
maintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                        valueGetter, ptr, results.get(0).getTimestamp(),
+                        env.getRegion().getRegionInfo().getStartKey(),
+                        env.getRegion().getRegionInfo().getEndKey());
+
+                if (txnProvider != null) {
+                    put = txnProvider.markPutAsCommitted(put, ts, ts);
+                }
+                indexMutations.add(put);
+            }
+        }
+        result.setKeyValues(results);
+    }
+    void deleteRow(List<Cell> results, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        // FIXME: the version of the Delete constructor without the lock
+        // args was introduced in 0.94.4, thus if we try to use it here
+        // we can no longer use the 0.94.2 version of the client.
+        Cell firstKV = results.get(0);
+        Delete delete = new Delete(firstKV.getRowArray(),
+                firstKV.getRowOffset(), firstKV.getRowLength(),ts);
+        if (replayMutations != null) {
+            delete.setAttribute(REPLAY_WRITES, replayMutations);
+        }
+        mutations.add(delete);
+        // force tephra to ignore this deletes
+        
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+    }
+
+    void deleteCForQ(Tuple result, List<Cell> results, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        // No need to search for delete column, since we project only it
+        // if no empty key value is being set
+        if (emptyCF == null ||
+                result.getValue(deleteCF, deleteCQ) != null) {
+            Delete delete = new Delete(results.get(0).getRowArray(),
+                    results.get(0).getRowOffset(),
+                    results.get(0).getRowLength());
+            delete.deleteColumns(deleteCF,  deleteCQ, ts);
+            // force tephra to ignore this deletes
+            
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+            mutations.add(delete);
+        }
+    }
+    void upsert(Tuple result, ImmutableBytesWritable ptr, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        Arrays.fill(values, null);
+        int bucketNumOffset = 0;
+        if (projectedTable.getBucketNum() != null) {
+            values[0] = new byte[] { 0 };
+            bucketNumOffset = 1;
+        }
+        int i = bucketNumOffset;
+        List<PColumn> projectedColumns = projectedTable.getColumns();
+        for (; i < projectedTable.getPKColumns().size(); i++) {
+            Expression expression = selectExpressions.get(i - bucketNumOffset);
+            if (expression.evaluate(result, ptr)) {
+                values[i] = ptr.copyBytes();
+                // If SortOrder from expression in SELECT doesn't match the
+                // column being projected into then invert the bits.
+                if (expression.getSortOrder() !=
+                        projectedColumns.get(i).getSortOrder()) {
+                    SortOrder.invert(values[i], 0, values[i], 0,
+                            values[i].length);
+                }
+            }else{
+                values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
+            }
+        }
+        projectedTable.newKey(ptr, values);
+        PRow row = projectedTable.newRow(GenericKeyValueBuilder.INSTANCE, ts, 
ptr, false);
+        for (; i < projectedColumns.size(); i++) {
+            Expression expression = selectExpressions.get(i - bucketNumOffset);
+            if (expression.evaluate(result, ptr)) {
+                PColumn column = projectedColumns.get(i);
+                if (!column.getDataType().isSizeCompatible(ptr, null,
+                        expression.getDataType(), expression.getSortOrder(),
+                        expression.getMaxLength(), expression.getScale(),
+                        column.getMaxLength(), column.getScale())) {
+                    throw new DataExceedsCapacityException(
+                            column.getDataType(), column.getMaxLength(),
+                            column.getScale(), column.getName().getString(), 
ptr);
+                }
+                column.getDataType().coerceBytes(ptr, null,
+                        expression.getDataType(), expression.getMaxLength(),
+                        expression.getScale(), expression.getSortOrder(),
+                        column.getMaxLength(), column.getScale(),
+                        column.getSortOrder(), 
projectedTable.rowKeyOrderOptimizable());
+                byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                row.setValue(column, bytes);
+            }
+        }
+        for (Mutation mutation : row.toRowMutations()) {
+            if (replayMutations != null) {
+                mutation.setAttribute(REPLAY_WRITES, replayMutations);
+            } else if (txnProvider != null && projectedTable.getType() == 
PTableType.INDEX) {
+                mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, 
ts);
+            }
+            mutations.add(mutation);
+        }
+        for (i = 0; i < selectExpressions.size(); i++) {
+            selectExpressions.get(i).reset();
+        }
+    }
+
+    void insertEmptyKeyValue(List<Cell> results, 
UngroupedAggregateRegionObserver.MutationList mutations) {
+        Set<Long> timeStamps =
+                Sets.newHashSetWithExpectedSize(results.size());
+        for (Cell kv : results) {
+            long kvts = kv.getTimestamp();
+            if (!timeStamps.contains(kvts)) {
+                Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
+                        kv.getRowLength());
+                put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
+                        ByteUtil.EMPTY_BYTE_ARRAY);
+                mutations.add(put);
+            }
+        }
+    }
+    @Override
+    public boolean next(List<Cell> resultsToReturn) throws IOException {

Review comment:
       Just a generic question: In case the RS dies and the server-side scanner 
dies with it, when a new scanner starts its work again it will start from row 0 
within the region. However, can it occur that the previous scanner had returned 
rows to the client upon which the client issued DELETE/UPSERT operations 
corresponding to the UPSERT SELECT/DELETE? Does something inside UARO prevent 
this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to