This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 68239ffcb568cc0dbcb45ce7f1aa3f215fd1e652 Author: Saksham Gangwar <sakshamgangwar@Sakshams-iMac.local> AuthorDate: Fri Aug 14 16:33:58 2020 -0700 PHOENIX-6078 Remove Internal Phoenix Connections from parent LinkedQueue when closed --- .../phoenix/query/MaxConcurrentConnectionsIT.java | 41 +++++++++++++++++++--- .../compile/MutatingParallelIteratorFactory.java | 4 +++ .../org/apache/phoenix/jdbc/PhoenixConnection.java | 24 +++++++++++++ 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java index 7da276c..611ef89 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.query; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.util.DelayedRegionServer; import org.apache.phoenix.util.PhoenixRuntime; @@ -31,22 +32,17 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; import java.util.Properties; -import static org.apache.phoenix.exception.SQLExceptionCode.NEW_CONNECTION_THROTTLED; import static org.apache.phoenix.exception.SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS; import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS; import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS; -import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED; import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -129,4 +125,39 @@ public class MaxConcurrentConnectionsIT extends BaseUniqueNamesOwnClusterIT { } } + + @Test public void testClosedChildConnectionsRemovedFromParentQueue() throws SQLException { + String tableName = generateUniqueName(); + String connectionUrl = getUniqueUrl(); + int NUMBER_OF_ROWS = 10; + String ddl = "CREATE TABLE " + tableName + " (V BIGINT PRIMARY KEY, K BIGINT)"; + Properties props = new Properties(); + props.setProperty(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(10)); + props.setProperty(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(10)); + try (Connection conn = DriverManager.getConnection(connectionUrl, props); + Statement statement = conn.createStatement()) { + statement.execute(ddl); + } + PhoenixConnection + connection = + (PhoenixConnection) DriverManager.getConnection(connectionUrl, props); + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + connection.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES (" + i + ", " + i + ")"); + connection.commit(); + } + connection.setAutoCommit(false); + try { + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + connection.createStatement() + .execute("DELETE FROM " + tableName + " WHERE K = " + i); + } + } catch (SQLException e) { + fail(); + } finally { + connection.close(); + } + // All 10 child connections should be removed successfully from the queue + assertEquals(0, connection.getChildConnectionsCount()); + } } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java index 7113867..2c5af7a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java @@ -119,6 +119,8 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato MutatingParallelIteratorFactory.this.connection.getMutationState() .join(finalState); } finally { + //Removing to be closed connection from the parent connection queue. + connection.removeChildConnection(clonedConnection); clonedConnection.close(); } } @@ -131,6 +133,8 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato } catch (Throwable ex) { // Catch just to make sure we close the cloned connection and then rethrow try { + //Removing to be closed connection from the parent connection queue. + connection.removeChildConnection(clonedConnection); // closeQuietly only handles IOException clonedConnection.close(); } catch (SQLException sqlEx) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index ae47e7d..dab4c6a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -478,6 +478,30 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea childConnections.add(connection); } + /** + * Method to remove child connection from childConnections Queue + * + * @param connection + */ + public void removeChildConnection(PhoenixConnection connection) { + if (childConnections != null) { + childConnections.remove(connection); + } + } + + /** + * Method to fetch child connections count from childConnections Queue + * + * @return int count + */ + @VisibleForTesting + public int getChildConnectionsCount() { + if (childConnections != null) { + return childConnections.size(); + } + return 0; + } + public Sampler<?> getSampler() { return this.sampler; }