This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ccc9ac6551 Adjust jdbc processId to statement level to avoid monitor 
block when run benchmarksql (#30201)
1ccc9ac6551 is described below

commit 1ccc9ac6551741aeb0e806311ca48f9f5e737a67
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Tue Feb 20 10:13:53 2024 +0800

    Adjust jdbc processId to statement level to avoid monitor block when run 
benchmarksql (#30201)
    
    * Adjust jdbc processId to statement level to avoid monitor block when run 
benchmarksql
    
    * use processId in PreparedStatement and Statement
---
 .../driver/jdbc/adapter/AbstractStatementAdapter.java       |  6 ++++++
 .../jdbc/core/connection/ShardingSphereConnection.java      |  9 ---------
 .../core/statement/ShardingSpherePreparedStatement.java     | 13 ++++++++-----
 .../driver/jdbc/core/statement/ShardingSphereStatement.java | 11 +++++++----
 .../circuit/statement/CircuitBreakerPreparedStatement.java  |  5 +++++
 5 files changed, 26 insertions(+), 18 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index 0d1b4a37456..1614bb0c08e 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOper
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
@@ -48,6 +49,8 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
     @Getter(AccessLevel.NONE)
     private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new 
ForceExecuteTemplate<>();
     
+    private final ProcessEngine processEngine = new ProcessEngine();
+    
     private boolean poolable;
     
     private int fetchSize;
@@ -90,6 +93,8 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
     
     protected abstract StatementManager getStatementManager();
     
+    protected abstract String getProcessId();
+    
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     public final void setPoolable(final boolean poolable) throws SQLException {
@@ -229,6 +234,7 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
             }
         } finally {
             getRoutedStatements().clear();
+            processEngine.disconnect(getProcessId());
         }
     }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 532a3743029..68989e91bfa 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -24,8 +24,6 @@ import 
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePrepar
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
 import 
org.apache.shardingsphere.driver.jdbc.exception.connection.ConnectionClosedException;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.transaction.api.TransactionType;
@@ -44,8 +42,6 @@ import java.sql.Statement;
  */
 public final class ShardingSphereConnection extends AbstractConnectionAdapter {
     
-    private final ProcessEngine processEngine = new ProcessEngine();
-    
     @Getter
     private final String databaseName;
     
@@ -55,9 +51,6 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
     @Getter
     private final DriverDatabaseConnectionManager databaseConnectionManager;
     
-    @Getter
-    private final String processId;
-    
     private boolean autoCommit = true;
     
     private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
@@ -70,7 +63,6 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
         this.databaseName = databaseName;
         this.contextManager = contextManager;
         databaseConnectionManager = new 
DriverDatabaseConnectionManager(databaseName, contextManager);
-        processId = processEngine.connect(new Grantee("", ""), databaseName);
     }
     
     /**
@@ -309,7 +301,6 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
     public void close() throws SQLException {
         closed = true;
         databaseConnectionManager.close();
-        processEngine.disconnect(processId);
     }
     
     private ConnectionContext getConnectionContext() {
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index a2075b54b28..5565a09b206 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -64,7 +64,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
 import org.apache.shardingsphere.infra.hint.HintManager;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
@@ -150,6 +149,9 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     @Getter(AccessLevel.PROTECTED)
     private final StatementManager statementManager;
     
+    @Getter
+    private final String processId;
+    
     @Getter
     private final boolean selectContainsEnhancedTable;
     
@@ -216,6 +218,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         trafficRule = 
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
         selectContainsEnhancedTable = sqlStatementContext instanceof 
SelectStatementContext && ((SelectStatementContext) 
sqlStatementContext).isContainsEnhancedTable();
         statementManager = new StatementManager();
+        processId = getProcessEngine().connect(new Grantee("", ""), 
databaseName);
     }
     
     private boolean isStatementsCacheable(final RuleMetaData 
databaseRuleMetaData) {
@@ -281,7 +284,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new 
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
         ExecutionGroupContext<JDBCExecutionUnit> context =
-                prepareEngine.prepare(new RouteContext(), 
Collections.singleton(executionUnit), new 
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new 
Grantee("", "")));
+                prepareEngine.prepare(new RouteContext(), 
Collections.singleton(executionUnit), new 
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
         if (context.getInputGroups().isEmpty() || 
context.getInputGroups().iterator().next().getInputs().isEmpty()) {
             throw new EmptyTrafficExecutionUnitException();
         }
@@ -453,7 +456,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ExecutionContext executionContext) throws 
SQLException {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
-                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new 
Grantee("", "")));
+                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
     }
     
     private boolean executeWithExecutionContext(final ExecutionContext 
executionContext) throws SQLException {
@@ -529,7 +532,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ExecutionContext executionContext) throws 
SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
-                new ExecutionGroupReportContext(ProcessIdContext.get(), 
databaseName, new Grantee("", "")));
+                new ExecutionGroupReportContext(processId, databaseName, new 
Grantee("", "")));
     }
     
     @Override
@@ -721,7 +724,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             ExecutionUnit executionUnit = each.getExecutionUnit();
             executionUnits.add(executionUnit);
         }
-        
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(), 
executionUnits, new ExecutionGroupReportContext(ProcessIdContext.get(), 
databaseName, new Grantee("", ""))));
+        
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(), 
executionUnits, new ExecutionGroupReportContext(processId, databaseName, new 
Grantee("", ""))));
         setBatchParametersForStatements(batchExecutor);
     }
     
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index e8daac199cd..7208285bb55 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -61,7 +61,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
@@ -122,6 +121,9 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     @Getter(AccessLevel.PROTECTED)
     private final StatementManager statementManager;
     
+    @Getter
+    private final String processId;
+    
     private final BatchStatementExecutor batchStatementExecutor;
     
     private boolean returnGeneratedKeys;
@@ -155,6 +157,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         statementManager = new StatementManager();
         batchStatementExecutor = new BatchStatementExecutor(this);
         databaseName = connection.getDatabaseName();
+        processId = getProcessEngine().connect(new Grantee("", ""), 
databaseName);
     }
     
     @Override
@@ -478,7 +481,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new 
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
         ExecutionGroupContext<JDBCExecutionUnit> context =
-                prepareEngine.prepare(new RouteContext(), 
Collections.singletonList(executionUnit), new 
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new 
Grantee("", "")));
+                prepareEngine.prepare(new RouteContext(), 
Collections.singletonList(executionUnit), new 
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
         return context.getInputGroups().stream().flatMap(each -> 
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
     }
     
@@ -525,13 +528,13 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ExecutionContext executionContext) throws 
SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
-                new ExecutionGroupReportContext(ProcessIdContext.get(), 
databaseName, new Grantee("", "")));
+                new ExecutionGroupReportContext(processId, databaseName, new 
Grantee("", "")));
     }
     
     private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionContext(final ExecutionContext executionContext) throws 
SQLException {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
-                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new 
Grantee("", "")));
+                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
     }
     
     private boolean executeWithExecutionContext(final ExecuteCallback 
executeCallback, final ExecutionContext executionContext) throws SQLException {
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
index ac096d5e97a..e179f53f3b2 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -284,6 +284,11 @@ public final class CircuitBreakerPreparedStatement extends 
AbstractUnsupportedOp
         return null;
     }
     
+    @Override
+    protected String getProcessId() {
+        return null;
+    }
+    
     @Override
     public ResultSet executeQuery() {
         return new CircuitBreakerResultSet();

Reply via email to