This is an automated email from the ASF dual-hosted git repository.
jisaac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new d5e33b4433 PHOENIX-7638 Creating a large number of views leads to OS
thread exhaustion (#2190)
d5e33b4433 is described below
commit d5e33b443338a6e96ae79d7657cfc6943a4379b1
Author: Jacob Isaac <[email protected]>
AuthorDate: Tue Jun 24 18:21:13 2025 -0700
PHOENIX-7638 Creating a large number of views leads to OS thread exhaustion
(#2190)
---
.../coprocessor/PhoenixRegionServerEndpoint.java | 7 +
.../java/org/apache/phoenix/util/ServerUtil.java | 12 +-
.../end2end/MetadataServerConnectionsIT.java | 194 ++++++++++++++++++++-
3 files changed, 197 insertions(+), 16 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index 22fa84c4a6..97f71d0862 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -37,6 +37,7 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,12 @@ public class PhoenixRegionServerEndpoint
.getInstance().getMetadataCachingSource();
}
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ RegionServerCoprocessor.super.stop(env);
+ ServerUtil.ConnectionFactory.shutdown();
+ }
+
@Override
public void validateLastDDLTimestamp(RpcController controller,
RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request,
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
index c88e7c12d3..00a9d35414 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -55,14 +55,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerUtil {
- private static final int COPROCESSOR_SCAN_WORKS =
VersionUtil.encodeVersion("0.98.6");
private static final Logger LOGGER =
LoggerFactory.getLogger(ServerUtil.class);
private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,";
- private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment
env) {
- return (VersionUtil.encodeVersion(env.getHBaseVersion()) >=
COPROCESSOR_SCAN_WORKS);
- }
-
public static boolean hasCoprocessor(RegionCoprocessorEnvironment env,
String CoprocessorClassName) {
Collection<CoprocessorDescriptor> coprocessors =
@@ -98,16 +93,10 @@ public class ServerUtil {
public static Table getHTableForCoprocessorScan
(RegionCoprocessorEnvironment env,
Table
writerTable) throws IOException {
- if (coprocessorScanWorks(env)) {
- return writerTable;
- }
return getTableFromSingletonPool(env, writerTable.getName());
}
public static Table getHTableForCoprocessorScan
(RegionCoprocessorEnvironment env, TableName tableName) throws IOException {
- if (coprocessorScanWorks(env)) {
- return env.getConnection().getTable(tableName);
- }
return getTableFromSingletonPool(env, tableName);
}
@@ -217,6 +206,7 @@ public class ServerUtil {
public static void shutdown() {
synchronized (ConnectionFactory.class) {
+ LOGGER.info("Closing ServerUtil.ConnectionFactory
connections");
for (Connection connection : connections.values()) {
try {
connection.close();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
index 63fd4e7c43..f773b0c45b 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
@@ -19,7 +19,15 @@ package org.apache.phoenix.end2end;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionImplementation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -28,8 +36,10 @@ import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.ClientUtil;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -38,13 +48,25 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import static
org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
+import static
org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_CORE_SIZE;
+import static
org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_MAX_SIZE;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Tests to ensure connection creation by metadata coproc does not need to make
@@ -65,6 +87,106 @@ public class MetadataServerConnectionsIT extends BaseTest {
}
public static class TestMetaDataEndpointImpl extends MetaDataEndpointImpl {
+ private RegionCoprocessorEnvironment env;
+
+ public static void setTestCreateView(boolean testCreateView) {
+ TestMetaDataEndpointImpl.testCreateView = testCreateView;
+ }
+
+ private static volatile boolean testCreateView = false;
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ super.start(env);
+ if (env instanceof RegionCoprocessorEnvironment) {
+ this.env = (RegionCoprocessorEnvironment) env;
+ } else {
+ throw new CoprocessorException("Must be loaded on a table
region!");
+ }
+ }
+
+ @Override
+ public void createTable(RpcController controller,
MetaDataProtos.CreateTableRequest request,
+ RpcCallback<MetaDataProtos.MetaDataResponse> done) {
+ // Invoke the actual create table routine
+ super.createTable(controller, request, done);
+
+ byte[][] rowKeyMetaData = new byte[3][];
+ byte[] schemaName = null;
+ byte[] tableName = null;
+ String fullTableName = null;
+
+ // Get the singleton connection for testing
+ org.apache.hadoop.hbase.client.Connection
+ conn =
ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION,
env);
+ try {
+ // Get the current table creation details
+ List<Mutation> tableMetadata =
ProtobufUtil.getMutations(request);
+ MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,
rowKeyMetaData);
+ schemaName =
rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+ tableName =
rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+ fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+
+ ThreadPoolExecutor ctpe = null;
+ ThreadPoolExecutor htpe = null;
+
+ // Get the thread pool executor from the connection.
+ if (conn instanceof ConnectionImplementation) {
+ ConnectionImplementation connImpl =
((ConnectionImplementation) conn);
+ Field props = null;
+ props =
ConnectionImplementation.class.getDeclaredField("batchPool");
+ props.setAccessible(true);
+ ctpe = (ThreadPoolExecutor) props.get(connImpl);
+ LOGGER.debug("ConnectionImplementation Thread pool info :"
+ ctpe.toString());
+
+ }
+
+ // Get the thread pool executor from the HTable.
+ Table hTable = ServerUtil.getHTableForCoprocessorScan(env,
TableName.valueOf(fullTableName));
+ if (hTable instanceof HTable) {
+ HTable testTable = (HTable) hTable;
+ Field props =
testTable.getClass().getDeclaredField("pool");
+ props.setAccessible(true);
+ htpe = ((ThreadPoolExecutor) props.get(hTable));
+ LOGGER.debug("HTable Thread pool info :" +
htpe.toString());
+ // Assert the HTable thread pool config match the
Connection pool configs.
+ // Since we are not overriding any defaults, it should
match the defaults.
+ assertEquals(htpe.getMaximumPoolSize(),
DEFAULT_HCONNECTION_POOL_MAX_SIZE);
+ assertEquals(htpe.getCorePoolSize(),
DEFAULT_HCONNECTION_POOL_CORE_SIZE);
+ LOGGER.debug("HTable threadpool info {}, {}, {}, {}",
+ htpe.getCorePoolSize(),
+ htpe.getMaximumPoolSize(),
+ htpe.getQueue().remainingCapacity(),
+ htpe.getKeepAliveTime(TimeUnit.SECONDS));
+
+ int count = Thread.activeCount();
+ Thread[] th = new Thread[count];
+ // returns the number of threads put into the array
+ Thread.enumerate(th);
+ long hTablePoolCount = Arrays.stream(th).filter(s ->
s.getName().equals("htable-pool-0")).count();
+ // Assert no default HTable threadpools are created.
+ assertEquals(0, hTablePoolCount);
+ LOGGER.debug("htable-pool-0 threads {}", hTablePoolCount);
+ }
+ // Assert that the threadpool from Connection and HTable are
the same.
+ assertEquals(ctpe, htpe);
+ } catch (RuntimeException | NoSuchFieldException |
IllegalAccessException | IOException t) {
+ // handle cases that an IOE is wrapped inside a
RuntimeException like HTableInterface#createHTableInterface
+ MetaDataProtos.MetaDataResponse.Builder builder =
+ MetaDataProtos.MetaDataResponse.newBuilder();
+
+ LOGGER.error("This is unexpected");
+ ProtobufUtil.setControllerException(controller,
+ ClientUtil.createIOException(
+ SchemaUtil.getPhysicalTableName(
+
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, false)
+ .toString(),
+ new DoNotRetryIOException("Not allowed")));
+ done.run(builder.build());
+
+ }
+
+ }
@Override
public void getVersion(RpcController controller,
MetaDataProtos.GetVersionRequest request,
@@ -83,6 +205,68 @@ public class MetadataServerConnectionsIT extends BaseTest {
}
}
+ @Test
+ public void testViewCreationAndServerConnections() throws Throwable {
+ final String tableName = generateUniqueName();
+ final String view01 = "v01_" + tableName;
+ final String view02 = "v02_" + tableName;
+ final String index_view01 = "idx_v01_" + tableName;
+ final String index_view02 = "idx_v02_" + tableName;
+ final String index_view03 = "idx_v03_" + tableName;
+ final String index_view04 = "idx_v04_" + tableName;
+ final int NUM_VIEWS = 50;
+
+ TestMetaDataEndpointImpl.setTestCreateView(true);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
TestMetaDataEndpointImpl.class);
+
+ final Statement stmt = conn.createStatement();
+
+ stmt.execute("CREATE TABLE " + tableName
+ + " (COL1 CHAR(10) NOT NULL, COL2 CHAR(5) NOT NULL, COL3
VARCHAR,"
+ + " COL4 VARCHAR CONSTRAINT pk PRIMARY KEY(COL1, COL2))"
+ + " UPDATE_CACHE_FREQUENCY=ALWAYS, MULTI_TENANT=true");
+ conn.commit();
+
+ for (int i=0; i< NUM_VIEWS; i++) {
+ Properties props = new Properties();
+ String viewTenantId = String.format("00T%012d", i);
+ props.setProperty(TENANT_ID_ATTRIB, viewTenantId);
+ // Create multilevel tenant views
+ try (Connection tConn = DriverManager.getConnection(getUrl(),
props)) {
+ final Statement viewStmt = tConn.createStatement();
+ viewStmt.execute("CREATE VIEW " + view01
+ + " (VCOL1 CHAR(8), COL5 VARCHAR) AS SELECT * FROM
" + tableName
+ + " WHERE COL2 = 'col2'");
+
+ viewStmt.execute("CREATE VIEW " + view02 + " (VCOL2
CHAR(10), COL6 VARCHAR)"
+ + " AS SELECT * FROM " + view01 + " WHERE VCOL1 =
'vcol1'");
+ tConn.commit();
+
+ // Create multilevel tenant indexes
+ final Statement indexStmt = tConn.createStatement();
+ indexStmt.execute("CREATE INDEX " + index_view01 + " ON "
+ view01 + " (COL5) INCLUDE "
+ + "(COL1, COL2, COL3)");
+ indexStmt.execute("CREATE INDEX " + index_view02 + " ON "
+ view02 + " (COL6) INCLUDE "
+ + "(COL1, COL2, COL3)");
+ indexStmt.execute("CREATE INDEX " + index_view03 + " ON "
+ view01 + " (COL5) INCLUDE "
+ + "(COL2, COL1)");
+ indexStmt.execute("CREATE INDEX " + index_view04 + " ON "
+ view02 + " (COL6) INCLUDE "
+ + "(COL2, COL1)");
+
+ tConn.commit();
+
+ }
+
+ }
+
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
TestMetaDataEndpointImpl.class);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ }
+ }
+
+
@Test
public void testConnectionFromMetadataServer() throws Throwable {
final String tableName = generateUniqueName();
@@ -149,11 +333,11 @@ public class MetadataServerConnectionsIT extends BaseTest
{
// No need to verify each row result as the primary focus of this
test is to ensure
// no RPC call from MetaDataEndpointImpl to MetaDataEndpointImpl
is done while
// creating server side connection.
- Assert.assertTrue(rs.next());
- Assert.assertTrue(rs.next());
- Assert.assertTrue(rs.next());
- Assert.assertTrue(rs.next());
- Assert.assertTrue(rs.next());
+ assertTrue(rs.next());
+ assertTrue(rs.next());
+ assertTrue(rs.next());
+ assertTrue(rs.next());
+ assertTrue(rs.next());
Assert.assertFalse(rs.next());
TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
TestMetaDataEndpointImpl.class);