This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 5cb18429c7 PHOENIX-6834 Use Pooled HConnection for Server Side Upsert Select 5cb18429c7 is described below commit 5cb18429c770d734bf4452c40f0ce1cf70719210 Author: Istvan Toth <st...@apache.org> AuthorDate: Wed Nov 23 17:31:07 2022 +0100 PHOENIX-6834 Use Pooled HConnection for Server Side Upsert Select --- .../UngroupedAggregateRegionScanner.java | 41 +++++++++++----------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java index e8ec6d758e..bd0bb1e054 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java @@ -34,9 +34,9 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB; import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; -import static org.apache.phoenix.util.WALAnnotationUtil.annotateMutation; import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner; import static org.apache.phoenix.util.ScanUtil.isDummy; +import static org.apache.phoenix.util.WALAnnotationUtil.annotateMutation; import java.io.IOException; import java.sql.SQLException; @@ -45,18 +45,12 @@ import java.util.Arrays; import java.util.List; import java.util.Set; -import org.apache.phoenix.thirdparty.com.google.common.collect.Sets; -import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -69,6 +63,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.exception.DataExceedsCapacityException; @@ -77,9 +72,13 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.aggregator.Aggregator; import org.apache.phoenix.expression.aggregator.Aggregators; import org.apache.phoenix.expression.aggregator.ServerAggregators; +import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.memory.InsufficientMemoryException; import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.query.QueryConstants; @@ -102,20 +101,18 @@ import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; -import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.thirdparty.com.google.common.collect.Sets; +import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints; import org.apache.phoenix.transaction.PhoenixTransactionProvider; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.hbase.index.ValueGetter; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixKeyValueUtil; +import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.StringUtil; @@ -164,7 +161,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { private boolean incrScanRefCount = false; private byte[] indexMaintainersPtr; private boolean useIndexProto; - private Connection targetHConn = null; + private PhoenixConnection targetPConn = null; public UngroupedAggregateRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner innerScanner, final Region region, final Scan scan, @@ -231,9 +228,13 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { if (upsertSelectTable != null) { isUpsert = true; projectedTable = deserializeTable(upsertSelectTable); - targetHConn = ConnectionFactory.createConnection(ungroupedAggregateRegionObserver.getUpsertSelectConfig()); - targetHTable = targetHConn.getTable( - TableName.valueOf(projectedTable.getPhysicalName().getBytes())); + targetPConn = + ((PhoenixConnection) QueryUtil.getConnectionOnServer( + ungroupedAggregateRegionObserver.getUpsertSelectConfig())); + targetHTable = + targetPConn.getQueryServices() + .getTable(projectedTable.getPhysicalName().getBytes()); + // TODO Can't we just close the PhoenixConnection immediately here ? selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS)); values = new byte[projectedTable.getPKColumns().size()][]; isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions); @@ -313,11 +314,11 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { LOGGER.error("Closing table: " + targetHTable + " failed: ", e); } } - if (targetHConn != null) { + if (targetPConn != null) { try { - targetHConn.close(); - } catch (IOException e) { - LOGGER.error("Closing connection: " + targetHConn + " failed: ", e); + targetPConn.close(); + } catch (SQLException e) { + LOGGER.error("Closing connection: " + targetPConn + " failed: ", e); } } } finally {