This is an automated email from the ASF dual-hosted git repository.
jisaac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 9380d8ef7c PHOENIX-7638 Creating a large number of views leads to OS
thread exhaustion (#2273)
9380d8ef7c is described below
commit 9380d8ef7c2a05079917711f4a402c727db8048a
Author: Jacob Isaac <[email protected]>
AuthorDate: Wed Aug 20 10:11:58 2025 -0700
PHOENIX-7638 Creating a large number of views leads to OS thread exhaustion
(#2273)
---
.../org/apache/phoenix/schema/MetaDataClient.java | 6 +-
.../java/org/apache/phoenix/util/TupleUtil.java | 7 +-
.../coprocessor/PhoenixRegionServerEndpoint.java | 3 +
.../java/org/apache/phoenix/util/ServerUtil.java | 13 +-
.../end2end/MetadataServerConnectionsIT.java | 181 +++++++++++++++++++++
.../org/apache/phoenix/end2end/UpsertValuesIT.java | 4 +-
6 files changed, 193 insertions(+), 21 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ae2248b86f..fdaeed259d 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -4745,9 +4745,9 @@ public class MetaDataClient {
/**
* To check if TTL is defined at any of the child below we are
checking it at
* {@link
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List,
ColumnMutator, int, PTable, PTable, boolean)}
- * level where in function {@link
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
- * validateIfMutationAllowedOnParent(PTable, List, PTableType, long,
byte[], byte[],
- * byte[], List, int)} we are already traversing through
allDescendantViews.
+ * level where in function
+ * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[],
byte[], byte[], List, int)}
+ * we are already traversing through allDescendantViews.
*/
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java
index b8f6a769d1..91e3da6e6d 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java
@@ -33,7 +33,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -237,9 +236,9 @@ public class TupleUtil {
* @throws SQLException If any SQL operation fails.
*/
@SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE",
- justification = "Tge statement object needs to be kept open for the
returned RS to be "
- + "valid, however this is acceptable as not
callingPhoenixStatement.close() "
- + "causes no resource leak")
+ justification = "Tge statement object needs to be kept open for the
returned RS to be "
+ + "valid, however this is acceptable as not
callingPhoenixStatement.close() "
+ + "causes no resource leak")
public static ResultSet getResultSet(Tuple toProject, TableName tableName,
Connection conn,
boolean withPrefetch) throws SQLException {
if (tableName == null) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index bdddf0b10b..4444118583 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,10 +67,12 @@ public class PhoenixRegionServerEndpoint extends
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
+ RegionServerCoprocessor.super.stop(env);
if (uncoveredIndexThreadPool != null) {
uncoveredIndexThreadPool
.stop("PhoenixRegionServerEndpoint is stopping. Shutting down
uncovered index threadpool.");
}
+ ServerUtil.ConnectionFactory.shutdown();
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
index c46fe7f977..4a409ac69e 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -53,14 +52,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerUtil {
- private static final int COPROCESSOR_SCAN_WORKS =
VersionUtil.encodeVersion("0.98.6");
private static final Logger LOGGER =
LoggerFactory.getLogger(ServerUtil.class);
private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,";
- private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment
env) {
- return (VersionUtil.encodeVersion(env.getHBaseVersion()) >=
COPROCESSOR_SCAN_WORKS);
- }
-
public static boolean hasCoprocessor(RegionCoprocessorEnvironment env,
String CoprocessorClassName) {
Collection<CoprocessorDescriptor> coprocessors =
@@ -99,17 +93,11 @@ public class ServerUtil {
public static Table getHTableForCoprocessorScan(RegionCoprocessorEnvironment
env,
Table writerTable) throws IOException {
- if (coprocessorScanWorks(env)) {
- return writerTable;
- }
return getTableFromSingletonPool(env, writerTable.getName());
}
public static Table getHTableForCoprocessorScan(RegionCoprocessorEnvironment
env,
TableName tableName) throws IOException {
- if (coprocessorScanWorks(env)) {
- return env.getConnection().getTable(tableName);
- }
return getTableFromSingletonPool(env, tableName);
}
@@ -222,6 +210,7 @@ public class ServerUtil {
public static void shutdown() {
synchronized (ConnectionFactory.class) {
+ LOGGER.info("Closing ServerUtil.ConnectionFactory connections");
for (Connection connection : connections.values()) {
try {
connection.close();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
index f24a3ba860..ecebf38016 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
@@ -18,15 +18,34 @@
package org.apache.phoenix.end2end;
import static
org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
+import static
org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_CORE_SIZE;
+import static
org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_MAX_SIZE;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.junit.Assert.assertEquals;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
+import java.io.IOException;
+import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionImplementation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -34,8 +53,10 @@ import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ClientUtil;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -64,6 +85,105 @@ public class MetadataServerConnectionsIT extends BaseTest {
}
public static class TestMetaDataEndpointImpl extends MetaDataEndpointImpl {
+ private RegionCoprocessorEnvironment env;
+
+ public static void setTestCreateView(boolean testCreateView) {
+ TestMetaDataEndpointImpl.testCreateView = testCreateView;
+ }
+
+ private static volatile boolean testCreateView = false;
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ super.start(env);
+ if (env instanceof RegionCoprocessorEnvironment) {
+ this.env = (RegionCoprocessorEnvironment) env;
+ } else {
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
+ }
+
+ @Override
+ public void createTable(RpcController controller,
MetaDataProtos.CreateTableRequest request,
+ RpcCallback<MetaDataProtos.MetaDataResponse> done) {
+ // Invoke the actual create table routine
+ super.createTable(controller, request, done);
+
+ byte[][] rowKeyMetaData = new byte[3][];
+ byte[] schemaName = null;
+ byte[] tableName = null;
+ String fullTableName = null;
+
+ // Get the singleton connection for testing
+ org.apache.hadoop.hbase.client.Connection conn =
ServerUtil.ConnectionFactory
+ .getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION,
env);
+ try {
+ // Get the current table creation details
+ List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
+ MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,
rowKeyMetaData);
+ schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+ tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+ fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+
+ ThreadPoolExecutor ctpe = null;
+ ThreadPoolExecutor htpe = null;
+
+ // Get the thread pool executor from the connection.
+ if (conn instanceof ConnectionImplementation) {
+ ConnectionImplementation connImpl = ((ConnectionImplementation)
conn);
+ Field props = null;
+ props = ConnectionImplementation.class.getDeclaredField("batchPool");
+ props.setAccessible(true);
+ ctpe = (ThreadPoolExecutor) props.get(connImpl);
+ LOGGER.debug("ConnectionImplementation Thread pool info :" +
ctpe.toString());
+
+ }
+
+ // Get the thread pool executor from the HTable.
+ Table hTable =
+ ServerUtil.getHTableForCoprocessorScan(env,
TableName.valueOf(fullTableName));
+ if (hTable instanceof HTable) {
+ HTable testTable = (HTable) hTable;
+ Field props = testTable.getClass().getDeclaredField("pool");
+ props.setAccessible(true);
+ htpe = ((ThreadPoolExecutor) props.get(hTable));
+ LOGGER.debug("HTable Thread pool info :" + htpe.toString());
+ // Assert the HTable thread pool config match the Connection pool
configs.
+ // Since we are not overriding any defaults, it should match the
defaults.
+ assertEquals(htpe.getMaximumPoolSize(),
DEFAULT_HCONNECTION_POOL_MAX_SIZE);
+ assertEquals(htpe.getCorePoolSize(),
DEFAULT_HCONNECTION_POOL_CORE_SIZE);
+ LOGGER.debug("HTable threadpool info {}, {}, {}, {}",
htpe.getCorePoolSize(),
+ htpe.getMaximumPoolSize(), htpe.getQueue().remainingCapacity(),
+ htpe.getKeepAliveTime(TimeUnit.SECONDS));
+
+ int count = Thread.activeCount();
+ Thread[] th = new Thread[count];
+ // returns the number of threads put into the array
+ Thread.enumerate(th);
+ long hTablePoolCount =
+ Arrays.stream(th).filter(s ->
s.getName().equals("htable-pool-0")).count();
+ // Assert no default HTable threadpools are created.
+ assertEquals(0, hTablePoolCount);
+ LOGGER.debug("htable-pool-0 threads {}", hTablePoolCount);
+ }
+ // Assert that the threadpool from Connection and HTable are the same.
+ assertEquals(ctpe, htpe);
+ } catch (RuntimeException | NoSuchFieldException |
IllegalAccessException | IOException t) {
+ // handle cases that an IOE is wrapped inside a RuntimeException
+ // like HTableInterface#createHTableInterface
+ MetaDataProtos.MetaDataResponse.Builder builder =
+ MetaDataProtos.MetaDataResponse.newBuilder();
+
+ LOGGER.error("This is unexpected");
+ ProtobufUtil.setControllerException(controller,
+ ClientUtil.createIOException(SchemaUtil
+
.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, false)
+ .toString(), new DoNotRetryIOException("Not allowed")));
+ done.run(builder.build());
+
+ }
+
+ }
@Override
public void getVersion(RpcController controller,
MetaDataProtos.GetVersionRequest request,
@@ -81,6 +201,67 @@ public class MetadataServerConnectionsIT extends BaseTest {
}
}
+ @Test
+ public void testViewCreationAndServerConnections() throws Throwable {
+ final String tableName = generateUniqueName();
+ final String view01 = "v01_" + tableName;
+ final String view02 = "v02_" + tableName;
+ final String index_view01 = "idx_v01_" + tableName;
+ final String index_view02 = "idx_v02_" + tableName;
+ final String index_view03 = "idx_v03_" + tableName;
+ final String index_view04 = "idx_v04_" + tableName;
+ final int NUM_VIEWS = 50;
+
+ TestMetaDataEndpointImpl.setTestCreateView(true);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
TestMetaDataEndpointImpl.class);
+
+ final Statement stmt = conn.createStatement();
+
+ stmt.execute("CREATE TABLE " + tableName
+ + " (COL1 CHAR(10) NOT NULL, COL2 CHAR(5) NOT NULL, COL3 VARCHAR,"
+ + " COL4 VARCHAR CONSTRAINT pk PRIMARY KEY(COL1, COL2))"
+ + " UPDATE_CACHE_FREQUENCY=ALWAYS, MULTI_TENANT=true");
+ conn.commit();
+
+ for (int i = 0; i < NUM_VIEWS; i++) {
+ Properties props = new Properties();
+ String viewTenantId = String.format("00T%012d", i);
+ props.setProperty(TENANT_ID_ATTRIB, viewTenantId);
+ // Create multilevel tenant views
+ try (Connection tConn = DriverManager.getConnection(getUrl(), props)) {
+ final Statement viewStmt = tConn.createStatement();
+ viewStmt
+ .execute("CREATE VIEW " + view01 + " (VCOL1 CHAR(8), COL5 VARCHAR)
AS SELECT * FROM "
+ + tableName + " WHERE COL2 = 'col2'");
+
+ viewStmt.execute("CREATE VIEW " + view02 + " (VCOL2 CHAR(10), COL6
VARCHAR)"
+ + " AS SELECT * FROM " + view01 + " WHERE VCOL1 = 'vcol1'");
+ tConn.commit();
+
+ // Create multilevel tenant indexes
+ final Statement indexStmt = tConn.createStatement();
+ indexStmt.execute("CREATE INDEX " + index_view01 + " ON " + view01 +
" (COL5) INCLUDE "
+ + "(COL1, COL2, COL3)");
+ indexStmt.execute("CREATE INDEX " + index_view02 + " ON " + view02 +
" (COL6) INCLUDE "
+ + "(COL1, COL2, COL3)");
+ indexStmt.execute(
+ "CREATE INDEX " + index_view03 + " ON " + view01 + " (COL5)
INCLUDE " + "(COL2, COL1)");
+ indexStmt.execute(
+ "CREATE INDEX " + index_view04 + " ON " + view02 + " (COL6)
INCLUDE " + "(COL2, COL1)");
+
+ tConn.commit();
+
+ }
+
+ }
+
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
TestMetaDataEndpointImpl.class);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ }
+ }
+
@Test
public void testConnectionFromMetadataServer() throws Throwable {
final String tableName = generateUniqueName();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index e2a6ac2cee..08da6be7ae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -125,8 +125,8 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT
{
assertEquals("a", rs.getString(1));
assertEquals("b", rs.getString(2));
assertFalse(rs.next());
- stmt = conn.prepareStatement("UPSERT INTO " + tableName
- + " (inst,host,\"DATE\") VALUES(?,'b',CURRENT_DATE())");
+ stmt = conn.prepareStatement(
+ "UPSERT INTO " + tableName + " (inst,host,\"DATE\")
VALUES(?,'b',CURRENT_DATE())");
stmt.setString(1, "a");
stmt.execute();
rs = stmt.getResultSet();