This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1ccc9ac6551 Adjust jdbc processId to statement level to avoid monitor
block when run benchmarksql (#30201)
1ccc9ac6551 is described below
commit 1ccc9ac6551741aeb0e806311ca48f9f5e737a67
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Tue Feb 20 10:13:53 2024 +0800
Adjust jdbc processId to statement level to avoid monitor block when run
benchmarksql (#30201)
* Adjust jdbc processId to statement level to avoid monitor block when run
benchmarksql
* use processId in PreparedStatement and Statement
---
.../driver/jdbc/adapter/AbstractStatementAdapter.java | 6 ++++++
.../jdbc/core/connection/ShardingSphereConnection.java | 9 ---------
.../core/statement/ShardingSpherePreparedStatement.java | 13 ++++++++-----
.../driver/jdbc/core/statement/ShardingSphereStatement.java | 11 +++++++----
.../circuit/statement/CircuitBreakerPreparedStatement.java | 5 +++++
5 files changed, 26 insertions(+), 18 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index 0d1b4a37456..1614bb0c08e 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOper
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
@@ -48,6 +49,8 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
@Getter(AccessLevel.NONE)
private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new
ForceExecuteTemplate<>();
+ private final ProcessEngine processEngine = new ProcessEngine();
+
private boolean poolable;
private int fetchSize;
@@ -90,6 +93,8 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
protected abstract StatementManager getStatementManager();
+ protected abstract String getProcessId();
+
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public final void setPoolable(final boolean poolable) throws SQLException {
@@ -229,6 +234,7 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
}
} finally {
getRoutedStatements().clear();
+ processEngine.disconnect(getProcessId());
}
}
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 532a3743029..68989e91bfa 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -24,8 +24,6 @@ import
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePrepar
import
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
import
org.apache.shardingsphere.driver.jdbc.exception.connection.ConnectionClosedException;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.transaction.api.TransactionType;
@@ -44,8 +42,6 @@ import java.sql.Statement;
*/
public final class ShardingSphereConnection extends AbstractConnectionAdapter {
- private final ProcessEngine processEngine = new ProcessEngine();
-
@Getter
private final String databaseName;
@@ -55,9 +51,6 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
@Getter
private final DriverDatabaseConnectionManager databaseConnectionManager;
- @Getter
- private final String processId;
-
private boolean autoCommit = true;
private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
@@ -70,7 +63,6 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
this.databaseName = databaseName;
this.contextManager = contextManager;
databaseConnectionManager = new
DriverDatabaseConnectionManager(databaseName, contextManager);
- processId = processEngine.connect(new Grantee("", ""), databaseName);
}
/**
@@ -309,7 +301,6 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
public void close() throws SQLException {
closed = true;
databaseConnectionManager.close();
- processEngine.disconnect(processId);
}
private ConnectionContext getConnectionContext() {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index a2075b54b28..5565a09b206 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -64,7 +64,6 @@ import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
import org.apache.shardingsphere.infra.hint.HintManager;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
@@ -150,6 +149,9 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
@Getter(AccessLevel.PROTECTED)
private final StatementManager statementManager;
+ @Getter
+ private final String processId;
+
@Getter
private final boolean selectContainsEnhancedTable;
@@ -216,6 +218,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
trafficRule =
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
selectContainsEnhancedTable = sqlStatementContext instanceof
SelectStatementContext && ((SelectStatementContext)
sqlStatementContext).isContainsEnhancedTable();
statementManager = new StatementManager();
+ processId = getProcessEngine().connect(new Grantee("", ""),
databaseName);
}
private boolean isStatementsCacheable(final RuleMetaData
databaseRuleMetaData) {
@@ -281,7 +284,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
ExecutionGroupContext<JDBCExecutionUnit> context =
- prepareEngine.prepare(new RouteContext(),
Collections.singleton(executionUnit), new
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new
Grantee("", "")));
+ prepareEngine.prepare(new RouteContext(),
Collections.singleton(executionUnit), new
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
if (context.getInputGroups().isEmpty() ||
context.getInputGroups().iterator().next().getInputs().isEmpty()) {
throw new EmptyTrafficExecutionUnitException();
}
@@ -453,7 +456,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
- .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new
Grantee("", "")));
+ .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
}
private boolean executeWithExecutionContext(final ExecutionContext
executionContext) throws SQLException {
@@ -529,7 +532,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
- new ExecutionGroupReportContext(ProcessIdContext.get(),
databaseName, new Grantee("", "")));
+ new ExecutionGroupReportContext(processId, databaseName, new
Grantee("", "")));
}
@Override
@@ -721,7 +724,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
ExecutionUnit executionUnit = each.getExecutionUnit();
executionUnits.add(executionUnit);
}
-
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(),
executionUnits, new ExecutionGroupReportContext(ProcessIdContext.get(),
databaseName, new Grantee("", ""))));
+
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(),
executionUnits, new ExecutionGroupReportContext(processId, databaseName, new
Grantee("", ""))));
setBatchParametersForStatements(batchExecutor);
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index e8daac199cd..7208285bb55 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -61,7 +61,6 @@ import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
import org.apache.shardingsphere.infra.instance.InstanceContext;
@@ -122,6 +121,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
@Getter(AccessLevel.PROTECTED)
private final StatementManager statementManager;
+ @Getter
+ private final String processId;
+
private final BatchStatementExecutor batchStatementExecutor;
private boolean returnGeneratedKeys;
@@ -155,6 +157,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
statementManager = new StatementManager();
batchStatementExecutor = new BatchStatementExecutor(this);
databaseName = connection.getDatabaseName();
+ processId = getProcessEngine().connect(new Grantee("", ""),
databaseName);
}
@Override
@@ -478,7 +481,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
ExecutionGroupContext<JDBCExecutionUnit> context =
- prepareEngine.prepare(new RouteContext(),
Collections.singletonList(executionUnit), new
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new
Grantee("", "")));
+ prepareEngine.prepare(new RouteContext(),
Collections.singletonList(executionUnit), new
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
return context.getInputGroups().stream().flatMap(each ->
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
}
@@ -525,13 +528,13 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
- new ExecutionGroupReportContext(ProcessIdContext.get(),
databaseName, new Grantee("", "")));
+ new ExecutionGroupReportContext(processId, databaseName, new
Grantee("", "")));
}
private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionContext(final ExecutionContext executionContext) throws
SQLException {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
- .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new
Grantee("", "")));
+ .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
}
private boolean executeWithExecutionContext(final ExecuteCallback
executeCallback, final ExecutionContext executionContext) throws SQLException {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
index ac096d5e97a..e179f53f3b2 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -284,6 +284,11 @@ public final class CircuitBreakerPreparedStatement extends
AbstractUnsupportedOp
return null;
}
+ @Override
+ protected String getProcessId() {
+ return null;
+ }
+
@Override
public ResultSet executeQuery() {
return new CircuitBreakerResultSet();