This is an automated email from the ASF dual-hosted git repository.

jisaac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new d5e33b4433 PHOENIX-7638 Creating a large number of views leads to OS 
thread exhaustion (#2190)
d5e33b4433 is described below

commit d5e33b443338a6e96ae79d7657cfc6943a4379b1
Author: Jacob Isaac <[email protected]>
AuthorDate: Tue Jun 24 18:21:13 2025 -0700

    PHOENIX-7638 Creating a large number of views leads to OS thread exhaustion 
(#2190)
---
 .../coprocessor/PhoenixRegionServerEndpoint.java   |   7 +
 .../java/org/apache/phoenix/util/ServerUtil.java   |  12 +-
 .../end2end/MetadataServerConnectionsIT.java       | 194 ++++++++++++++++++++-
 3 files changed, 197 insertions(+), 16 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index 22fa84c4a6..97f71d0862 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -37,6 +37,7 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +58,12 @@ public class PhoenixRegionServerEndpoint
                                 .getInstance().getMetadataCachingSource();
     }
 
+    @Override
+    public void stop(CoprocessorEnvironment env) throws IOException {
+        RegionServerCoprocessor.super.stop(env);
+        ServerUtil.ConnectionFactory.shutdown();
+    }
+
     @Override
     public void validateLastDDLTimestamp(RpcController controller,
             RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request,
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java 
b/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
index c88e7c12d3..00a9d35414 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -55,14 +55,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ServerUtil {
-    private static final int COPROCESSOR_SCAN_WORKS = 
VersionUtil.encodeVersion("0.98.6");
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerUtil.class);
     private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,";
 
-    private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment 
env) {
-        return (VersionUtil.encodeVersion(env.getHBaseVersion()) >= 
COPROCESSOR_SCAN_WORKS);
-    }
-
     public static boolean hasCoprocessor(RegionCoprocessorEnvironment env,
                                          String CoprocessorClassName) {
         Collection<CoprocessorDescriptor> coprocessors =
@@ -98,16 +93,10 @@ public class ServerUtil {
     
     public static Table getHTableForCoprocessorScan 
(RegionCoprocessorEnvironment env,
                                                                Table 
writerTable) throws IOException {
-        if (coprocessorScanWorks(env)) {
-            return writerTable;
-        }
         return getTableFromSingletonPool(env, writerTable.getName());
     }
     
     public static Table getHTableForCoprocessorScan 
(RegionCoprocessorEnvironment env, TableName tableName) throws IOException {
-        if (coprocessorScanWorks(env)) {
-            return env.getConnection().getTable(tableName);
-        }
         return getTableFromSingletonPool(env, tableName);
     }
 
@@ -217,6 +206,7 @@ public class ServerUtil {
         
         public static void shutdown() {
             synchronized (ConnectionFactory.class) {
+                LOGGER.info("Closing ServerUtil.ConnectionFactory 
connections");
                 for (Connection connection : connections.values()) {
                     try {
                         connection.close();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
index 63fd4e7c43..f773b0c45b 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
@@ -19,7 +19,15 @@ package org.apache.phoenix.end2end;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionImplementation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -28,8 +36,10 @@ import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.ClientUtil;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -38,13 +48,25 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
+import static 
org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_CORE_SIZE;
+import static 
org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_MAX_SIZE;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests to ensure connection creation by metadata coproc does not need to make
@@ -65,6 +87,106 @@ public class MetadataServerConnectionsIT extends BaseTest {
     }
 
     public static class TestMetaDataEndpointImpl extends MetaDataEndpointImpl {
+        private RegionCoprocessorEnvironment env;
+
+        public static void setTestCreateView(boolean testCreateView) {
+            TestMetaDataEndpointImpl.testCreateView = testCreateView;
+        }
+
+        private static volatile boolean testCreateView = false;
+
+        @Override
+        public void start(CoprocessorEnvironment env) throws IOException {
+            super.start(env);
+            if (env instanceof RegionCoprocessorEnvironment) {
+                this.env = (RegionCoprocessorEnvironment) env;
+            } else {
+                throw new CoprocessorException("Must be loaded on a table 
region!");
+            }
+        }
+
+        @Override
+        public void createTable(RpcController controller, 
MetaDataProtos.CreateTableRequest request,
+                RpcCallback<MetaDataProtos.MetaDataResponse> done) {
+            // Invoke the actual create table routine
+            super.createTable(controller, request, done);
+
+            byte[][] rowKeyMetaData = new byte[3][];
+            byte[] schemaName = null;
+            byte[] tableName = null;
+            String fullTableName = null;
+
+            // Get the singleton connection for testing
+            org.apache.hadoop.hbase.client.Connection
+                    conn = 
ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION,
 env);
+            try {
+                // Get the current table creation details
+                List<Mutation> tableMetadata = 
ProtobufUtil.getMutations(request);
+                MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, 
rowKeyMetaData);
+                schemaName = 
rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+                tableName = 
rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+                fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+
+                ThreadPoolExecutor ctpe = null;
+                ThreadPoolExecutor htpe = null;
+
+                // Get the thread pool executor from the connection.
+                if (conn instanceof ConnectionImplementation) {
+                    ConnectionImplementation connImpl = 
((ConnectionImplementation) conn);
+                    Field props = null;
+                    props = 
ConnectionImplementation.class.getDeclaredField("batchPool");
+                    props.setAccessible(true);
+                    ctpe = (ThreadPoolExecutor) props.get(connImpl);
+                    LOGGER.debug("ConnectionImplementation Thread pool info :" 
+ ctpe.toString());
+
+                }
+
+                // Get the thread pool executor from the HTable.
+                Table hTable = ServerUtil.getHTableForCoprocessorScan(env, 
TableName.valueOf(fullTableName));
+                if (hTable instanceof HTable) {
+                    HTable testTable = (HTable) hTable;
+                    Field props = 
testTable.getClass().getDeclaredField("pool");
+                    props.setAccessible(true);
+                    htpe = ((ThreadPoolExecutor) props.get(hTable));
+                    LOGGER.debug("HTable Thread pool info :" + 
htpe.toString());
+                    // Assert the HTable thread pool config match the 
Connection pool configs.
+                    // Since we are not overriding any defaults, it should 
match the defaults.
+                    assertEquals(htpe.getMaximumPoolSize(), 
DEFAULT_HCONNECTION_POOL_MAX_SIZE);
+                    assertEquals(htpe.getCorePoolSize(), 
DEFAULT_HCONNECTION_POOL_CORE_SIZE);
+                    LOGGER.debug("HTable threadpool info {}, {}, {}, {}",
+                            htpe.getCorePoolSize(),
+                            htpe.getMaximumPoolSize(),
+                            htpe.getQueue().remainingCapacity(),
+                            htpe.getKeepAliveTime(TimeUnit.SECONDS));
+
+                    int count = Thread.activeCount();
+                    Thread[] th = new Thread[count];
+                    // returns the number of threads put into the array
+                    Thread.enumerate(th);
+                    long hTablePoolCount = Arrays.stream(th).filter(s -> 
s.getName().equals("htable-pool-0")).count();
+                    // Assert no default HTable threadpools are created.
+                    assertEquals(0, hTablePoolCount);
+                    LOGGER.debug("htable-pool-0 threads {}", hTablePoolCount);
+                }
+                // Assert that the threadpool from Connection and HTable are 
the same.
+                assertEquals(ctpe, htpe);
+            } catch (RuntimeException | NoSuchFieldException | 
IllegalAccessException | IOException t) {
+                // handle cases that an IOE is wrapped inside a 
RuntimeException like HTableInterface#createHTableInterface
+                MetaDataProtos.MetaDataResponse.Builder builder =
+                        MetaDataProtos.MetaDataResponse.newBuilder();
+
+                LOGGER.error("This is unexpected");
+                ProtobufUtil.setControllerException(controller,
+                        ClientUtil.createIOException(
+                                SchemaUtil.getPhysicalTableName(
+                                                
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, false)
+                                        .toString(),
+                                new DoNotRetryIOException("Not allowed")));
+                done.run(builder.build());
+
+            }
+
+        }
 
         @Override
         public void getVersion(RpcController controller, 
MetaDataProtos.GetVersionRequest request,
@@ -83,6 +205,68 @@ public class MetadataServerConnectionsIT extends BaseTest {
         }
     }
 
+    @Test
+    public void testViewCreationAndServerConnections() throws Throwable {
+        final String tableName = generateUniqueName();
+        final String view01 = "v01_" + tableName;
+        final String view02 = "v02_" + tableName;
+        final String index_view01 = "idx_v01_" + tableName;
+        final String index_view02 = "idx_v02_" + tableName;
+        final String index_view03 = "idx_v03_" + tableName;
+        final String index_view04 = "idx_v04_" + tableName;
+        final int NUM_VIEWS = 50;
+
+        TestMetaDataEndpointImpl.setTestCreateView(true);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", 
MetaDataEndpointImpl.class);
+            TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", 
TestMetaDataEndpointImpl.class);
+
+            final Statement stmt = conn.createStatement();
+
+            stmt.execute("CREATE TABLE " + tableName
+                    + " (COL1 CHAR(10) NOT NULL, COL2 CHAR(5) NOT NULL, COL3 
VARCHAR,"
+                    + " COL4 VARCHAR CONSTRAINT pk PRIMARY KEY(COL1, COL2))"
+                    + " UPDATE_CACHE_FREQUENCY=ALWAYS, MULTI_TENANT=true");
+            conn.commit();
+
+            for (int i=0; i< NUM_VIEWS; i++) {
+                Properties props = new Properties();
+                String viewTenantId = String.format("00T%012d", i);
+                props.setProperty(TENANT_ID_ATTRIB, viewTenantId);
+                // Create multilevel tenant views
+                try (Connection tConn = DriverManager.getConnection(getUrl(), 
props)) {
+                    final Statement viewStmt = tConn.createStatement();
+                    viewStmt.execute("CREATE VIEW " + view01
+                            + " (VCOL1 CHAR(8), COL5 VARCHAR) AS SELECT * FROM 
" + tableName
+                            + " WHERE COL2 = 'col2'");
+
+                    viewStmt.execute("CREATE VIEW " + view02 + " (VCOL2 
CHAR(10), COL6 VARCHAR)"
+                            + " AS SELECT * FROM " + view01 + " WHERE VCOL1 = 
'vcol1'");
+                    tConn.commit();
+
+                    // Create multilevel tenant indexes
+                    final Statement indexStmt = tConn.createStatement();
+                    indexStmt.execute("CREATE INDEX " + index_view01 + " ON " 
+ view01 + " (COL5) INCLUDE "
+                            + "(COL1, COL2, COL3)");
+                    indexStmt.execute("CREATE INDEX " + index_view02 + " ON " 
+ view02 + " (COL6) INCLUDE "
+                            + "(COL1, COL2, COL3)");
+                    indexStmt.execute("CREATE INDEX " + index_view03 + " ON " 
+ view01 + " (COL5) INCLUDE "
+                            + "(COL2, COL1)");
+                    indexStmt.execute("CREATE INDEX " + index_view04 + " ON " 
+ view02 + " (COL6) INCLUDE "
+                            + "(COL2, COL1)");
+
+                    tConn.commit();
+
+                }
+
+            }
+
+            TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", 
TestMetaDataEndpointImpl.class);
+            TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", 
MetaDataEndpointImpl.class);
+        }
+    }
+
+
     @Test
     public void testConnectionFromMetadataServer() throws Throwable {
         final String tableName = generateUniqueName();
@@ -149,11 +333,11 @@ public class MetadataServerConnectionsIT extends BaseTest 
{
             // No need to verify each row result as the primary focus of this 
test is to ensure
             // no RPC call from MetaDataEndpointImpl to MetaDataEndpointImpl 
is done while
             // creating server side connection.
-            Assert.assertTrue(rs.next());
-            Assert.assertTrue(rs.next());
-            Assert.assertTrue(rs.next());
-            Assert.assertTrue(rs.next());
-            Assert.assertTrue(rs.next());
+            assertTrue(rs.next());
+            assertTrue(rs.next());
+            assertTrue(rs.next());
+            assertTrue(rs.next());
+            assertTrue(rs.next());
             Assert.assertFalse(rs.next());
 
             TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", 
TestMetaDataEndpointImpl.class);

Reply via email to