This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cb18429c7 PHOENIX-6834 Use Pooled HConnection for Server Side Upsert 
Select
5cb18429c7 is described below

commit 5cb18429c770d734bf4452c40f0ce1cf70719210
Author: Istvan Toth <st...@apache.org>
AuthorDate: Wed Nov 23 17:31:07 2022 +0100

    PHOENIX-6834 Use Pooled HConnection for Server Side Upsert Select
---
 .../UngroupedAggregateRegionScanner.java           | 41 +++++++++++-----------
 1 file changed, 21 insertions(+), 20 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index e8ec6d758e..bd0bb1e054 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -34,9 +34,9 @@ import static 
org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
-import static org.apache.phoenix.util.WALAnnotationUtil.annotateMutation;
 import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
 import static org.apache.phoenix.util.ScanUtil.isDummy;
+import static org.apache.phoenix.util.WALAnnotationUtil.annotateMutation;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -45,18 +45,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
-import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -69,6 +63,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
@@ -77,9 +72,13 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.Aggregators;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.memory.InsufficientMemoryException;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.query.QueryConstants;
@@ -102,20 +101,18 @@ import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
-import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.StringUtil;
@@ -164,7 +161,7 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
     private boolean incrScanRefCount = false;
     private byte[] indexMaintainersPtr;
     private boolean useIndexProto;
-    private Connection targetHConn = null;
+    private PhoenixConnection targetPConn = null;
 
     public UngroupedAggregateRegionScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
                                            final RegionScanner innerScanner, 
final Region region, final Scan scan,
@@ -231,9 +228,13 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
         if (upsertSelectTable != null) {
             isUpsert = true;
             projectedTable = deserializeTable(upsertSelectTable);
-            targetHConn = 
ConnectionFactory.createConnection(ungroupedAggregateRegionObserver.getUpsertSelectConfig());
-            targetHTable = targetHConn.getTable(
-                    
TableName.valueOf(projectedTable.getPhysicalName().getBytes()));
+            targetPConn =
+                    ((PhoenixConnection) QueryUtil.getConnectionOnServer(
+                        
ungroupedAggregateRegionObserver.getUpsertSelectConfig()));
+            targetHTable =
+                    targetPConn.getQueryServices()
+                            
.getTable(projectedTable.getPhysicalName().getBytes());
+            // TODO Can't we just close the PhoenixConnection immediately here 
?
             selectExpressions = 
deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
             values = new byte[projectedTable.getPKColumns().size()][];
             isPKChanging = ExpressionUtil.isPkPositionChanging(new 
TableRef(projectedTable), selectExpressions);
@@ -313,11 +314,11 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
                     LOGGER.error("Closing table: " + targetHTable + " failed: 
", e);
                 }
             }
-            if (targetHConn != null) {
+            if (targetPConn != null) {
                 try {
-                    targetHConn.close();
-                } catch (IOException e) {
-                    LOGGER.error("Closing connection: " + targetHConn + " 
failed: ", e);
+                    targetPConn.close();
+                } catch (SQLException e) {
+                    LOGGER.error("Closing connection: " + targetPConn + " 
failed: ", e);
                 }
             }
         } finally {

Reply via email to