[
https://issues.apache.org/jira/browse/PHOENIX-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17230196#comment-17230196
]
ASF GitHub Bot commented on PHOENIX-5998:
-----------------------------------------
ChinmaySKulkarni commented on a change in pull request #936:
URL: https://github.com/apache/phoenix/pull/936#discussion_r521609388
##########
File path: phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
##########
@@ -629,6 +626,11 @@ public static Configuration
setUpConfigForMiniCluster(Configuration conf, ReadOn
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0);
+ if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0)
== 0) {
+ conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
Review comment:
Did you mean to set this to some non-zero value?
##########
File path: phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
##########
@@ -629,6 +626,11 @@ public static Configuration
setUpConfigForMiniCluster(Configuration conf, ReadOn
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0);
+ if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0)
== 0) {
+ conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
+ // This results in processing one row at a time in each next
operation of the aggregate region
+ // scanner, i.e., one row pages
Review comment:
This comment needs to be changed to reflect the "time" aspect of paging
rather than number of rows, right?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
##########
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO;
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.REPLAY_WRITES;
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY;
+import static
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.checkForLocalIndexColumnFamilies;
+import static
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeExpressions;
+import static
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeTable;
+import static
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.getBlockingMemstoreSize;
+import static
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.setIndexAndTransactionProperties;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
+import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.memory.InsufficientMemoryException;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueSchema;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBinary;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PFloat;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ExpressionUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(UngroupedAggregateRegionScanner.class);
+
+ protected long pageSizeInMs = Long.MAX_VALUE;
+ protected int maxBatchSize = 0;
+ protected Scan scan;
+ protected RegionScanner innerScanner;
+ protected Region region;
+ private final UngroupedAggregateRegionObserver
ungroupedAggregateRegionObserver;
+ final private RegionCoprocessorEnvironment env;
+ private final boolean useQualifierAsIndex;
+ boolean needToWrite = false;
+ final Pair<Integer, Integer> minMaxQualifiers;
+ byte[][] values = null;
+ PTable.QualifierEncodingScheme encodingScheme;
+ PTable writeToTable = null;
+ PTable projectedTable = null;
+ boolean isDescRowKeyOrderUpgrade;
+ final int offset;
+ boolean buildLocalIndex;
+ List<IndexMaintainer> indexMaintainers;
+ boolean isPKChanging = false;
+ long ts;
+ PhoenixTransactionProvider txnProvider = null;
+ UngroupedAggregateRegionObserver.MutationList indexMutations;
+ boolean isDelete = false;
+ byte[] replayMutations;
+ boolean isUpsert = false;
+ List<Expression> selectExpressions = null;
+ byte[] deleteCQ = null;
+ byte[] deleteCF = null;
+ byte[] emptyCF = null;
+ byte[] indexUUID;
+ byte[] txState;
+ byte[] clientVersionBytes;
+ long blockingMemStoreSize;
+ long maxBatchSizeBytes = 0L;
+ HTable targetHTable = null;
+ boolean incrScanRefCount = false;
+ byte[] indexMaintainersPtr;
+ boolean useIndexProto;
+
+ public UngroupedAggregateRegionScanner(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ final RegionScanner innerScanner,
final Region region, final Scan scan,
+ final RegionCoprocessorEnvironment
env,
+ final
UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+ throws IOException, SQLException{
+ super(innerScanner);
+ this.env = env;
+ this.region = region;
+ this.scan = scan;
+ this.ungroupedAggregateRegionObserver =
ungroupedAggregateRegionObserver;
+ this.innerScanner = innerScanner;
+ Configuration conf = env.getConfiguration();
+ if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) !=
null) {
+ byte[] pageSizeFromScan =
+
scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_ROWS);
+ if (pageSizeFromScan != null) {
+ pageSizeInMs = Bytes.toLong(pageSizeFromScan);
+ } else {
+ pageSizeInMs =
+ conf.getLong(UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS,
+
QueryServicesOptions.DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS);
+ }
+ }
+ ts = scan.getTimeRange().getMax();
+ boolean localIndexScan = ScanUtil.isLocalIndex(scan);
+
+ encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+ int offsetToBe = 0;
+ if (localIndexScan) {
+ /*
+ * For local indexes, we need to set an offset on row key
expressions to skip
+ * the region start key.
+ */
+ offsetToBe = region.getRegionInfo().getStartKey().length != 0 ?
region.getRegionInfo().getStartKey().length :
+ region.getRegionInfo().getEndKey().length;
+ ScanUtil.setRowKeyOffset(scan, offsetToBe);
+ }
+ offset = offsetToBe;
+
+ byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY);
+ isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null;
+ if (isDescRowKeyOrderUpgrade) {
+ LOGGER.debug("Upgrading row key for " +
region.getRegionInfo().getTable().getNameAsString());
+ projectedTable = deserializeTable(descRowKeyTableBytes);
+ try {
+ writeToTable = PTableImpl.builderWithColumns(projectedTable,
+ getColumnsToClone(projectedTable))
+ .setRowKeyOrderOptimizable(true)
+ .build();
+ } catch (SQLException e) {
+ ServerUtil.throwIOException("Upgrade failed", e); // Impossible
+ }
+ values = new byte[projectedTable.getPKColumns().size()][];
+ }
+ boolean useProto = false;
+ byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+ useProto = localIndexBytes != null;
+ if (localIndexBytes == null) {
+ localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+ }
+ indexMaintainers = localIndexBytes == null ? null :
IndexMaintainer.deserialize(localIndexBytes, useProto);
+ indexMutations = localIndexBytes == null ? new
UngroupedAggregateRegionObserver.MutationList() : new
UngroupedAggregateRegionObserver.MutationList(1024);
+
+ replayMutations = scan.getAttribute(REPLAY_WRITES);
+ indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+ txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+ clientVersionBytes =
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+ if (txState != null) {
+ int clientVersion = clientVersionBytes == null ?
ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+ txnProvider = TransactionFactory.getTransactionProvider(txState,
clientVersion);
+ }
+ byte[] upsertSelectTable =
scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
+ if (upsertSelectTable != null) {
+ isUpsert = true;
+ projectedTable = deserializeTable(upsertSelectTable);
+ targetHTable = new
HTable(ungroupedAggregateRegionObserver.getUpsertSelectConfig(),
+ projectedTable.getPhysicalName().getBytes());
+ selectExpressions =
deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
+ values = new byte[projectedTable.getPKColumns().size()][];
+ isPKChanging = ExpressionUtil.isPkPositionChanging(new
TableRef(projectedTable), selectExpressions);
+ } else {
+ byte[] isDeleteAgg =
scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
+ isDelete = isDeleteAgg != null &&
Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+ if (!isDelete) {
+ deleteCF =
scan.getAttribute(BaseScannerRegionObserver.DELETE_CF);
+ deleteCQ =
scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
+ }
+ emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
+ }
+ ColumnReference[] dataColumns =
IndexUtil.deserializeDataTableColumnsToJoin(scan);
+ useQualifierAsIndex =
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
+
+ /**
+ * Slow down the writes if the memstore size more than
+ * (hbase.hregion.memstore.block.multiplier - 1) times
hbase.hregion.memstore.flush.size
+ * bytes. This avoids flush storm to hdfs for cases like index
building where reads and
+ * write happen to all the table regions in the server.
+ */
+ blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ;
+
+ buildLocalIndex = indexMaintainers != null && dataColumns==null &&
!localIndexScan;
+ if(buildLocalIndex) {
+ checkForLocalIndexColumnFamilies(region, indexMaintainers);
+ }
+ if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
+ || (deleteCQ != null && deleteCF != null) || emptyCF != null
|| buildLocalIndex) {
+ needToWrite = true;
+ if((isUpsert && (targetHTable == null ||
+
!targetHTable.getName().equals(region.getTableDesc().getTableName())))) {
+ needToWrite = false;
+ }
+ maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+ }
+ minMaxQualifiers =
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped
coprocessor scan " + scan + " " + region.getRegionInfo(),
ScanUtil.getCustomAnnotations(scan)));
+ }
+ useIndexProto = true;
+ indexMaintainersPtr =
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+ // for backward compatiblity fall back to look by the old attribute
+ if (indexMaintainersPtr == null) {
+ indexMaintainersPtr =
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ useIndexProto = false;
+ }
+
+ if (needToWrite) {
+ ungroupedAggregateRegionObserver.incrementScansReferenceCount();
+ incrScanRefCount = true;
+ }
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (needToWrite && incrScanRefCount) {
+ ungroupedAggregateRegionObserver.decrementScansReferenceCount();
+ }
+ try {
+ if (targetHTable != null) {
+ try {
+ targetHTable.close();
+ } catch (IOException e) {
+ LOGGER.error("Closing table: " + targetHTable + " failed:
", e);
+ }
+ }
+ } finally {
+ innerScanner.close();
+ }
+ }
+
+ boolean descRowKeyOrderUpgrade(List<Cell> results, ImmutableBytesWritable
ptr,
+ UngroupedAggregateRegionObserver.MutationList
mutations) throws IOException {
+ Arrays.fill(values, null);
+ Cell firstKV = results.get(0);
+ RowKeySchema schema = projectedTable.getRowKeySchema();
+ int maxOffset = schema.iterator(firstKV.getRowArray(),
firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr);
+ for (int i = 0; i < schema.getFieldCount(); i++) {
+ Boolean hasValue = schema.next(ptr, i, maxOffset);
+ if (hasValue == null) {
+ break;
+ }
+ ValueSchema.Field field = schema.getField(i);
+ if (field.getSortOrder() == SortOrder.DESC) {
+ // Special case for re-writing DESC ARRAY, as the actual byte
value needs to change in this case
+ if (field.getDataType().isArrayType()) {
+ field.getDataType().coerceBytes(ptr, null,
field.getDataType(),
+ field.getMaxLength(), field.getScale(),
field.getSortOrder(),
+ field.getMaxLength(), field.getScale(),
field.getSortOrder(), true); // force to use correct separator byte
+ }
+ // Special case for re-writing DESC CHAR or DESC BINARY, to
force the re-writing of trailing space characters
+ else if (field.getDataType() == PChar.INSTANCE ||
field.getDataType() == PBinary.INSTANCE) {
+ int len = ptr.getLength();
+ while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] ==
StringUtil.SPACE_UTF8) {
+ len--;
+ }
+ ptr.set(ptr.get(), ptr.getOffset(), len);
+ // Special case for re-writing DESC FLOAT and DOUBLE, as
they're not inverted like they should be (PHOENIX-2171)
+ } else if (field.getDataType() == PFloat.INSTANCE ||
field.getDataType() == PDouble.INSTANCE) {
+ byte[] invertedBytes = SortOrder.invert(ptr.get(),
ptr.getOffset(), ptr.getLength());
+ ptr.set(invertedBytes);
+ }
+ } else if (field.getDataType() == PBinary.INSTANCE) {
+ // Remove trailing space characters so that the setValues call
below will replace them
+ // with the correct zero byte character. Note this is somewhat
dangerous as these
+ // could be legit, but I don't know what the alternative is.
+ int len = ptr.getLength();
+ while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] ==
StringUtil.SPACE_UTF8) {
+ len--;
+ }
+ ptr.set(ptr.get(), ptr.getOffset(), len);
+ }
+ values[i] = ptr.copyBytes();
+ }
+ writeToTable.newKey(ptr, values);
+ if (Bytes.compareTo(
+ firstKV.getRowArray(), firstKV.getRowOffset() + offset,
firstKV.getRowLength(),
+ ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) {
+ return false;
+ }
+ byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ if (offset > 0) { // for local indexes (prepend region start key)
+ byte[] newRowWithOffset = new byte[offset + newRow.length];
+ System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(),
newRowWithOffset, 0, offset);;
+ System.arraycopy(newRow, 0, newRowWithOffset, offset,
newRow.length);
+ newRow = newRowWithOffset;
+ }
+ byte[] oldRow = Bytes.copy(firstKV.getRowArray(),
firstKV.getRowOffset(), firstKV.getRowLength());
+ for (Cell cell : results) {
+ // Copy existing cell but with new row key
+ Cell newCell = new KeyValue(newRow, 0, newRow.length,
+ cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength(),
+ cell.getTimestamp(),
KeyValue.Type.codeToType(cell.getTypeByte()),
+ cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength());
+ switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
+ case Put:
+ // If Put, point delete old Put
+ Delete del = new Delete(oldRow);
+ del.addDeleteMarker(new KeyValue(cell.getRowArray(),
cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(),
+ cell.getQualifierArray(),
cell.getQualifierOffset(),
+ cell.getQualifierLength(), cell.getTimestamp(),
KeyValue.Type.Delete,
+ ByteUtil.EMPTY_BYTE_ARRAY, 0, 0));
+ mutations.add(del);
+
+ Put put = new Put(newRow);
+ put.add(newCell);
+ mutations.add(put);
+ break;
+ case Delete:
+ case DeleteColumn:
+ case DeleteFamily:
+ case DeleteFamilyVersion:
+ Delete delete = new Delete(newRow);
+ delete.addDeleteMarker(newCell);
+ mutations.add(delete);
+ break;
+ }
+ }
+ return true;
+ }
+
+ void buildLocalIndex(Tuple result, List<Cell> results,
ImmutableBytesWritable ptr,
+ UngroupedAggregateRegionObserver.MutationList
mutations) throws IOException {
+ for (IndexMaintainer maintainer : indexMaintainers) {
+ if (!results.isEmpty()) {
+ result.getKey(ptr);
+ ValueGetter valueGetter =
+ maintainer.createGetterFromKeyValues(
+ ImmutableBytesPtr.copyBytesIfNecessary(ptr),
+ results);
+ Put put =
maintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ valueGetter, ptr, results.get(0).getTimestamp(),
+ env.getRegion().getRegionInfo().getStartKey(),
+ env.getRegion().getRegionInfo().getEndKey());
+
+ if (txnProvider != null) {
+ put = txnProvider.markPutAsCommitted(put, ts, ts);
+ }
+ indexMutations.add(put);
+ }
+ }
+ result.setKeyValues(results);
+ }
+ void deleteRow(List<Cell> results,
UngroupedAggregateRegionObserver.MutationList mutations) {
+ // FIXME: the version of the Delete constructor without the lock
+ // args was introduced in 0.94.4, thus if we try to use it here
+ // we can no longer use the 0.94.2 version of the client.
+ Cell firstKV = results.get(0);
+ Delete delete = new Delete(firstKV.getRowArray(),
+ firstKV.getRowOffset(), firstKV.getRowLength(),ts);
+ if (replayMutations != null) {
+ delete.setAttribute(REPLAY_WRITES, replayMutations);
+ }
+ mutations.add(delete);
+ // force tephra to ignore this deletes
+
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new
byte[0]);
+ }
+
+ void deleteCForQ(Tuple result, List<Cell> results,
UngroupedAggregateRegionObserver.MutationList mutations) {
+ // No need to search for delete column, since we project only it
+ // if no empty key value is being set
+ if (emptyCF == null ||
+ result.getValue(deleteCF, deleteCQ) != null) {
+ Delete delete = new Delete(results.get(0).getRowArray(),
+ results.get(0).getRowOffset(),
+ results.get(0).getRowLength());
+ delete.deleteColumns(deleteCF, deleteCQ, ts);
+ // force tephra to ignore this deletes
+
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new
byte[0]);
+ mutations.add(delete);
+ }
+ }
+ void upsert(Tuple result, ImmutableBytesWritable ptr,
UngroupedAggregateRegionObserver.MutationList mutations) {
+ Arrays.fill(values, null);
+ int bucketNumOffset = 0;
+ if (projectedTable.getBucketNum() != null) {
+ values[0] = new byte[] { 0 };
+ bucketNumOffset = 1;
+ }
+ int i = bucketNumOffset;
+ List<PColumn> projectedColumns = projectedTable.getColumns();
+ for (; i < projectedTable.getPKColumns().size(); i++) {
+ Expression expression = selectExpressions.get(i - bucketNumOffset);
+ if (expression.evaluate(result, ptr)) {
+ values[i] = ptr.copyBytes();
+ // If SortOrder from expression in SELECT doesn't match the
+ // column being projected into then invert the bits.
+ if (expression.getSortOrder() !=
+ projectedColumns.get(i).getSortOrder()) {
+ SortOrder.invert(values[i], 0, values[i], 0,
+ values[i].length);
+ }
+ }else{
+ values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
+ }
+ }
+ projectedTable.newKey(ptr, values);
+ PRow row = projectedTable.newRow(GenericKeyValueBuilder.INSTANCE, ts,
ptr, false);
+ for (; i < projectedColumns.size(); i++) {
+ Expression expression = selectExpressions.get(i - bucketNumOffset);
+ if (expression.evaluate(result, ptr)) {
+ PColumn column = projectedColumns.get(i);
+ if (!column.getDataType().isSizeCompatible(ptr, null,
+ expression.getDataType(), expression.getSortOrder(),
+ expression.getMaxLength(), expression.getScale(),
+ column.getMaxLength(), column.getScale())) {
+ throw new DataExceedsCapacityException(
+ column.getDataType(), column.getMaxLength(),
+ column.getScale(), column.getName().getString(),
ptr);
+ }
+ column.getDataType().coerceBytes(ptr, null,
+ expression.getDataType(), expression.getMaxLength(),
+ expression.getScale(), expression.getSortOrder(),
+ column.getMaxLength(), column.getScale(),
+ column.getSortOrder(),
projectedTable.rowKeyOrderOptimizable());
+ byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ row.setValue(column, bytes);
+ }
+ }
+ for (Mutation mutation : row.toRowMutations()) {
+ if (replayMutations != null) {
+ mutation.setAttribute(REPLAY_WRITES, replayMutations);
+ } else if (txnProvider != null && projectedTable.getType() ==
PTableType.INDEX) {
+ mutation = txnProvider.markPutAsCommitted((Put)mutation, ts,
ts);
+ }
+ mutations.add(mutation);
+ }
+ for (i = 0; i < selectExpressions.size(); i++) {
+ selectExpressions.get(i).reset();
+ }
+ }
+
+ void insertEmptyKeyValue(List<Cell> results,
UngroupedAggregateRegionObserver.MutationList mutations) {
+ Set<Long> timeStamps =
+ Sets.newHashSetWithExpectedSize(results.size());
+ for (Cell kv : results) {
+ long kvts = kv.getTimestamp();
+ if (!timeStamps.contains(kvts)) {
+ Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
+ kv.getRowLength());
+ put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
+ ByteUtil.EMPTY_BYTE_ARRAY);
+ mutations.add(put);
+ }
+ }
+ }
+ @Override
+ public boolean next(List<Cell> resultsToReturn) throws IOException {
+ boolean hasMore;
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ Configuration conf = env.getConfiguration();
+ final TenantCache tenantCache = GlobalCache.getTenantCache(env,
ScanUtil.getTenantId(scan));
+ try (MemoryManager.MemoryChunk em =
tenantCache.getMemoryManager().allocate(0)) {
+ Aggregators aggregators = ServerAggregators.deserialize(
+ scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS),
conf, em);
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ Cell lastCell = null;
+ boolean hasAny = false;
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ Tuple result = useQualifierAsIndex ? new
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+ UngroupedAggregateRegionObserver.MutationList mutations = new
UngroupedAggregateRegionObserver.MutationList();
+ if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
+ || (deleteCQ != null && deleteCF != null) || emptyCF !=
null || buildLocalIndex) {
+ mutations = new
UngroupedAggregateRegionObserver.MutationList(Ints.saturatedCast(maxBatchSize +
maxBatchSize / 10));
+ }
+ region.startRegionOperation();
+ try {
+ synchronized (innerScanner) {
+ do {
+
ungroupedAggregateRegionObserver.checkForRegionClosing();
+ List<Cell> results = useQualifierAsIndex ? new
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(),
minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
+ // Results are potentially returned even when the
return value of s.next is false
+ // since this is an indication of whether or not there
are more values after the
+ // ones returned
+ hasMore = innerScanner.nextRaw(results);
+ if (!results.isEmpty()) {
+ lastCell = results.get(0);
+ result.setKeyValues(results);
+ if (isDescRowKeyOrderUpgrade) {
+ if (!descRowKeyOrderUpgrade(results, ptr,
mutations)) {
+ continue;
+ }
+ } else if (buildLocalIndex) {
+ buildLocalIndex(result, results, ptr,
mutations);
+ } else if (isDelete) {
+ deleteRow(results, mutations);
+ } else if (isUpsert) {
+ upsert(result, ptr, mutations);
+ } else if (deleteCF != null && deleteCQ != null) {
+ deleteCForQ(result, results, mutations);
+ }
+ if (emptyCF != null) {
+ /*
+ * If we've specified an emptyCF, then we need
to insert an empty
+ * key value "retroactively" for any key value
that is visible at
+ * the timestamp that the DDL was issued. Key
values that are not
+ * visible at this timestamp will not ever be
projected up to
+ * scans past this timestamp, so don't need to
be considered.
+ * We insert one empty key value per row per
timestamp.
+ */
+ insertEmptyKeyValue(results, mutations);
+ }
+ if (ServerUtil.readyToCommit(mutations.size(),
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+
ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID,
blockingMemStoreSize, indexMaintainersPtr,
+ txState, targetHTable, useIndexProto,
isPKChanging, clientVersionBytes);
+ mutations.clear();
+ }
+ // Commit in batches based on
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+
+ if
(ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(),
maxBatchSize, maxBatchSizeBytes)) {
+
setIndexAndTransactionProperties(indexMutations, indexUUID,
indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
+
ungroupedAggregateRegionObserver.commitBatch(region, indexMutations,
blockingMemStoreSize);
+ indexMutations.clear();
+ }
+ aggregators.aggregate(rowAggregators, result);
+ hasAny = true;
+ }
+ } while (hasMore &&
(EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeInMs);
Review comment:
This logic is not sufficient to ensure that only `pageSizeInMs` time is
spent on the server-side operation. If say `pageSizeInMs=1000` and the last
iteration starts at 990 ms but takes much longer than 10 ms, we can end up
spending much more time than the configured page size duration. Is that fine?
If not, we can use something like `ExecutorService` with a timeout and do
the entire `do while` logic inside a separate thread. This ensures that only
`pageSizeInMs` time is ever spent on this. This might complicate things a bit
however if there is work being done inside the loop which might be left in an
incomplete state on thread interruption. Also, if we go with this approach, we
don't guarantee even 1 iteration of the loop in spite of it being a `do while`.
Depends on how strict we want the paging duration limit to be. What do you
think?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Paged server side ungrouped aggregate operations
> -------------------------------------------------
>
> Key: PHOENIX-5998
> URL: https://issues.apache.org/jira/browse/PHOENIX-5998
> Project: Phoenix
> Issue Type: Improvement
> Reporter: Kadir OZDEMIR
> Assignee: Kadir OZDEMIR
> Priority: Major
> Fix For: 4.16.0
>
> Attachments: PHOENIX-5998.4.x.001.patch, PHOENIX-5998.4.x.002.patch,
> PHOENIX-5998.4.x.003.patch
>
>
> Phoenix provides the option of performing upsert select and delete query
> operations on the client or server side. This is decided by the Phoenix
> optimizer based on configuration parameters. For the server side option, the
> table operation (upsert select/delete query) is parallelized such that
> multiple table regions are scanned and the mutations derived from these scans
> can also be executed in parallel on the server side. However, currently there
> is no paging capability and the server side operation can take long enough
> lead to HBase client timeouts. When this happens, Phoenix can return failure
> to its applications and the rest of the parallel scans and mutations on the
> server side can still continue since Phoenix has no mechanism in place to
> stop these operations before returning failure to applications. This can
> create unexpected race conditions between these left-over operations and the
> new operations issued by applications. Putting a limit on the number of rows
> to be processed within a single RPC call (i.e., the next operation on the
> scanner) on the server side using a Phoenix level paging is highly desirable
> and a required step to prevent the possible race conditions. This paging
> mechanism has been already implemented for index rebuild and verification
> operations and proven to be effective to prevent timeouts. This paging can be
> implemented for all server side operations including aggregates, upsert
> selects, delete queries and so on.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)