This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 93c2d2f PHOENIX-6658 Replace HRegion.get() calls 93c2d2f is described below commit 93c2d2fde43e97206ee760b25a1993e8c72d92b3 Author: Istvan Toth <st...@apache.org> AuthorDate: Mon Feb 28 18:53:19 2022 +0100 PHOENIX-6658 Replace HRegion.get() calls --- .../phoenix/coprocessor/MetaDataEndpointImpl.java | 33 ++++++---- .../coprocessor/SequenceRegionObserver.java | 71 ++++++++++++---------- .../apache/phoenix/index/PhoenixIndexBuilder.java | 13 +++- .../apache/phoenix/util/PhoenixKeyValueUtil.java | 27 ++++++-- 4 files changed, 92 insertions(+), 52 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 59961af..bc790a0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -3874,19 +3874,22 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); if (rowLock == null) { throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); } - try { - Get get = new Get(key); - get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES); - get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES); - get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES); - get.addColumn(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES); - Result currentResult = region.get(get); - if (currentResult.rawCells().length == 0) { + + Get get = new Get(key); + get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES); + get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES); + get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES); + get.addColumn(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES); + try (RegionScanner scanner = region.getScanner(new Scan(get))) { + List<Cell> cells = new ArrayList<>(); + scanner.next(cells); + if (cells.isEmpty()) { builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND); builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); done.run(builder.build()); return; } + Result currentResult = Result.create(cells); Cell dataTableKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES); Cell currentStateKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES); Cell currentDisableTimeStamp = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES); @@ -3988,10 +3991,14 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); if (currentState == PIndexState.PENDING_DISABLE) { if (newState == PIndexState.ACTIVE) { //before making index ACTIVE check if all clients succeed otherwise keep it PENDING_DISABLE - byte[] count = region - .get(new Get(key).addColumn(TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES)) - .getValue(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES); + byte[] count; + try (RegionScanner countScanner = region.getScanner(new Scan(get))) { + List<Cell> countCells = new ArrayList<>(); + scanner.next(countCells); + count = Result.create(countCells) + .getValue(TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES); + } if (count != null && Bytes.toLong(count) != 0) { newState = PIndexState.PENDING_DISABLE; newKVs.remove(disableTimeStampKVIndex); @@ -4415,7 +4422,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); Region region = env.getRegion(); Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), MIN_TABLE_TIMESTAMP, clientTimeStamp); List<Cell> results = Lists.newArrayList(); - try (RegionScanner scanner = region.getScanner(scan);) { + try (RegionScanner scanner = region.getScanner(scan)) { scanner.next(results); if (results.isEmpty()) { // Should not be possible return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java index 8e48899..33e7bf0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java @@ -19,6 +19,7 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -36,13 +37,14 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.RowLock; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; @@ -121,32 +123,35 @@ public class SequenceRegionObserver implements RegionObserver, RegionCoprocessor region.startRegionOperation(); try { ServerUtil.acquireLock(region, row, locks); - try { - long maxTimestamp = tr.getMax(); - boolean validateOnly = true; - Get get = new Get(row); - get.setTimeRange(tr.getMin(), tr.getMax()); - for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) { - byte[] cf = entry.getKey(); - for (Cell cq : entry.getValue()) { - long value = Bytes.toLong(cq.getValueArray(), cq.getValueOffset()); - get.addColumn(cf, CellUtil.cloneQualifier(cq)); - long cellTimestamp = cq.getTimestamp(); - // Workaround HBASE-15698 by using the lowest of the timestamps found - // on the Increment or any of its Cells. - if (cellTimestamp > 0 && cellTimestamp < maxTimestamp) { - maxTimestamp = cellTimestamp; - get.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, maxTimestamp); - } - validateOnly &= (Sequence.ValueOp.VALIDATE_SEQUENCE.ordinal() == value); + long maxTimestamp = tr.getMax(); + boolean validateOnly = true; + Get get = new Get(row); + get.setTimeRange(tr.getMin(), tr.getMax()); + for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) { + byte[] cf = entry.getKey(); + for (Cell cq : entry.getValue()) { + long value = Bytes.toLong(cq.getValueArray(), cq.getValueOffset()); + get.addColumn(cf, CellUtil.cloneQualifier(cq)); + long cellTimestamp = cq.getTimestamp(); + // Workaround HBASE-15698 by using the lowest of the timestamps found + // on the Increment or any of its Cells. + if (cellTimestamp > 0 && cellTimestamp < maxTimestamp) { + maxTimestamp = cellTimestamp; + get.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, maxTimestamp); } + validateOnly &= (Sequence.ValueOp.VALIDATE_SEQUENCE.ordinal() == value); } - Result result = region.get(get); - if (result.isEmpty()) { + } + try (RegionScanner scanner = region.getScanner(new Scan(get))) { + List<Cell> currentCells = new ArrayList<>(); + scanner.next(currentCells); + // These cells are returned by this method, and may be backed by ByteBuffers + // that we free when the RegionScanner is closed on return + PhoenixKeyValueUtil.maybeCopyCellList(currentCells); + if (currentCells.isEmpty()) { return getErrorResult(row, maxTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode()); } - - + Result result = Result.create(currentCells); Cell currentValueKV = Sequence.getCurrentValueKV(result); Cell incrementByKV = Sequence.getIncrementByKV(result); Cell cacheSizeKV = Sequence.getCacheSizeKV(result); @@ -379,15 +384,17 @@ public class SequenceRegionObserver implements RegionObserver, RegionCoprocessor region.startRegionOperation(); try { ServerUtil.acquireLock(region, row, locks); - try { - byte[] family = CellUtil.cloneFamily(keyValue); - byte[] qualifier = CellUtil.cloneQualifier(keyValue); - Get get = new Get(row); - get.setTimeRange(minGetTimestamp, maxGetTimestamp); - get.addColumn(family, qualifier); - Result result = region.get(get); - if (result.isEmpty()) { + byte[] family = CellUtil.cloneFamily(keyValue); + byte[] qualifier = CellUtil.cloneQualifier(keyValue); + + Get get = new Get(row); + get.setTimeRange(minGetTimestamp, maxGetTimestamp); + get.addColumn(family, qualifier); + try (RegionScanner scanner = region.getScanner(new Scan(get))) { + List<Cell> cells = new ArrayList<>(); + scanner.next(cells); + if (cells.isEmpty()) { if (op == Sequence.MetaOp.DROP_SEQUENCE || op == Sequence.MetaOp.RETURN_SEQUENCE) { return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode()); } @@ -399,7 +406,7 @@ public class SequenceRegionObserver implements RegionObserver, RegionCoprocessor Mutation m = null; switch (op) { case RETURN_SEQUENCE: - KeyValue currentValueKV = PhoenixKeyValueUtil.maybeCopyCell(result.rawCells()[0]); + KeyValue currentValueKV = PhoenixKeyValueUtil.maybeCopyCell(cells.get(0)); long expectedValue = PLong.INSTANCE.getCodec().decodeLong(append.getAttribute(CURRENT_VALUE_ATTRIB), 0, SortOrder.getDefault()); long value = PLong.INSTANCE.getCodec().decodeLong(currentValueKV.getValueArray(), currentValueKV.getValueOffset(), SortOrder.getDefault()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index 7cef8dc..1ab132d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -23,6 +23,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -37,12 +38,13 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; @@ -150,8 +152,13 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder { final Get get = new Get(rowKey); if (isDupKeyIgnore(opBytes)) { get.setFilter(new FirstKeyOnlyFilter()); - Result result = this.env.getRegion().get(get); - return result.isEmpty() ? convertIncrementToPutInSingletonList(inc) : Collections.<Mutation>emptyList(); + try (RegionScanner scanner = this.env.getRegion().getScanner(new Scan(get))) { + List<Cell> cells = new ArrayList<>(); + scanner.next(cells); + return cells.isEmpty() + ? convertIncrementToPutInSingletonList(inc) + : Collections.<Mutation>emptyList(); + } } ByteArrayInputStream stream = new ByteArrayInputStream(opBytes); DataInputStream input = new DataInputStream(stream); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java index 0af2148..f553c85 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java @@ -21,9 +21,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; +import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell.Type; import org.apache.hadoop.hbase.CellBuilderFactory; @@ -34,16 +36,12 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compat.hbase.CompatUtil; import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.schema.types.PArrayDataTypeEncoder; /** * @@ -216,12 +214,33 @@ public class PhoenixKeyValueUtil { // Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something // that will likely be removed at some point in time. if (c == null) return null; + // TODO Do we really want to return only KeyValues, or would it be enough to + // copy ByteBufferExtendedCells to heap ? + // i.e can we avoid copying on-heap cells like BufferedDataBlockEncoder.OnheapDecodedCell ? if (c instanceof KeyValue) { return (KeyValue) c; } return KeyValueUtil.copyToNewKeyValue(c); } + /** + * Copy all Off-Heap cells to KeyValues + * The input list is modified. + * + * @param cells is modified in place + * @return the modified list (optional, input list is modified in place) + */ + public static List<Cell> maybeCopyCellList(List<Cell> cells) { + ListIterator<Cell> cellsIt = cells.listIterator(); + while (cellsIt.hasNext()) { + Cell c = cellsIt.next(); + if (c instanceof ByteBufferExtendedCell) { + cellsIt.set(KeyValueUtil.copyToNewKeyValue(c)); + } + } + return cells; + } + private static long calculateMultiRowMutationSize(MultiRowMutationState mutations) { long size = 0; // iterate over rows