This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 64cc7af8693 Add transaction executor for driver to handle TCL
statements (#34173)
64cc7af8693 is described below
commit 64cc7af869315d3e49c3d5de94c60a4be57c8009
Author: ZhangCheng <[email protected]>
AuthorDate: Fri Dec 27 10:15:07 2024 +0800
Add transaction executor for driver to handle TCL statements (#34173)
* Add transaction executor for driver to handle TCL statements
* Add transaction executor for driver to handle TCL statements
* Add transaction executor for driver to handle TCL statements
---
RELEASE-NOTES.md | 1 +
.../tableless/TablelessRouteEngineFactory.java | 2 +-
.../executor/engine/DriverExecuteExecutor.java | 11 ++-
.../engine/facade/DriverExecutorFacade.java | 6 +-
.../DriverTransactionSQLStatementExecutor.java | 85 ++++++++++++++++++++++
5 files changed, 102 insertions(+), 3 deletions(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 943a601cf47..37de7afcafb 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -45,6 +45,7 @@
1. SQL Binder: Support copy statement sql bind and add bind test case -
[#34159](https://github.com/apache/shardingsphere/pull/34159)
1. SQL Binder: Support truncate table sql bind and add test case -
[#34162](https://github.com/apache/shardingsphere/pull/34162)
1. SQL Binder: Support create view, alter view, drop view sql bind logic -
[#34167](https://github.com/apache/shardingsphere/pull/34167)
+1. Transaction: Support savepoint/release savepoint TCL statements in jdbc
adapter -[#34173](https://github.com/apache/shardingsphere/pull/34173)
### Bug Fixes
diff --git
a/infra/route/src/main/java/org/apache/shardingsphere/infra/route/engine/tableless/TablelessRouteEngineFactory.java
b/infra/route/src/main/java/org/apache/shardingsphere/infra/route/engine/tableless/TablelessRouteEngineFactory.java
index c133e01f690..0d39734c0d9 100644
---
a/infra/route/src/main/java/org/apache/shardingsphere/infra/route/engine/tableless/TablelessRouteEngineFactory.java
+++
b/infra/route/src/main/java/org/apache/shardingsphere/infra/route/engine/tableless/TablelessRouteEngineFactory.java
@@ -74,7 +74,7 @@ public final class TablelessRouteEngineFactory {
if (sqlStatement instanceof DALStatement) {
return getDALRouteEngine(sqlStatement, database);
}
- // TODO remove this logic when savepoint handle in proxy and jdbc
adapter @zhangcheng
+ // TODO Support more TCL statements by transaction module, then remove
this.
if (sqlStatement instanceof TCLStatement) {
return new TablelessDataSourceBroadcastRouteEngine();
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
index c13341e1a92..3ee2c70fc3d 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecu
import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
import
org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc.DriverJDBCPushDownExecuteExecutor;
import
org.apache.shardingsphere.driver.executor.engine.pushdown.raw.DriverRawPushDownExecuteExecutor;
+import
org.apache.shardingsphere.driver.executor.engine.transaction.DriverTransactionSQLStatementExecutor;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
@@ -37,6 +38,7 @@ import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.TCLStatement;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
@@ -63,15 +65,19 @@ public final class DriverExecuteExecutor {
private final SQLFederationEngine sqlFederationEngine;
+ private final DriverTransactionSQLStatementExecutor transactionExecutor;
+
private ExecuteType executeType;
public DriverExecuteExecutor(final ShardingSphereConnection connection,
final ShardingSphereMetaData metaData,
- final JDBCExecutor jdbcExecutor, final
RawExecutor rawExecutor, final SQLFederationEngine sqlFederationEngine) {
+ final JDBCExecutor jdbcExecutor, final
RawExecutor rawExecutor, final SQLFederationEngine sqlFederationEngine,
+ final DriverTransactionSQLStatementExecutor
transactionExecutor) {
this.connection = connection;
this.metaData = metaData;
jdbcPushDownExecutor = new
DriverJDBCPushDownExecuteExecutor(connection, metaData, jdbcExecutor);
rawPushDownExecutor = new DriverRawPushDownExecuteExecutor(connection,
metaData, rawExecutor);
this.sqlFederationEngine = sqlFederationEngine;
+ this.transactionExecutor = transactionExecutor;
}
/**
@@ -100,6 +106,9 @@ public final class DriverExecuteExecutor {
metaDataRefreshEngine.refresh(queryContext.getSqlStatementContext());
return true;
}
+ if (transactionExecutor.decide(queryContext)) {
+ return transactionExecutor.execute((TCLStatement)
queryContext.getSqlStatementContext().getSqlStatement());
+ }
ExecutionContext executionContext =
new KernelProcessor().generateExecutionContext(queryContext,
metaData.getGlobalRuleMetaData(), metaData.getProps());
return executePushDown(database, executionContext, prepareEngine,
executeCallback, addCallback, replayCallback);
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/facade/DriverExecutorFacade.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/facade/DriverExecutorFacade.java
index 8301aef3b9b..3a193e56273 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/facade/DriverExecutorFacade.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/facade/DriverExecutorFacade.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplay
import org.apache.shardingsphere.driver.executor.engine.DriverExecuteExecutor;
import
org.apache.shardingsphere.driver.executor.engine.DriverExecuteQueryExecutor;
import
org.apache.shardingsphere.driver.executor.engine.DriverExecuteUpdateExecutor;
+import
org.apache.shardingsphere.driver.executor.engine.transaction.DriverTransactionSQLStatementExecutor;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -63,6 +64,8 @@ public final class DriverExecutorFacade implements
AutoCloseable {
private final SQLFederationEngine sqlFederationEngine;
+ private final DriverTransactionSQLStatementExecutor transactionExecutor;
+
private final DriverExecuteQueryExecutor queryExecutor;
private final DriverExecuteUpdateExecutor updateExecutor;
@@ -79,10 +82,11 @@ public final class DriverExecutorFacade implements
AutoCloseable {
String currentSchemaName = new
DatabaseTypeRegistry(metaData.getDatabase(connection.getCurrentDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getCurrentDatabaseName());
sqlFederationEngine =
new SQLFederationEngine(connection.getCurrentDatabaseName(),
currentSchemaName, metaData,
connection.getContextManager().getMetaDataContexts().getStatistics(),
jdbcExecutor);
+ transactionExecutor = new
DriverTransactionSQLStatementExecutor(connection);
RawExecutor rawExecutor = new
RawExecutor(connection.getContextManager().getExecutorEngine(),
connection.getDatabaseConnectionManager().getConnectionContext());
queryExecutor = new DriverExecuteQueryExecutor(connection, metaData,
jdbcExecutor, rawExecutor, sqlFederationEngine);
updateExecutor = new DriverExecuteUpdateExecutor(connection, metaData,
jdbcExecutor, rawExecutor);
- executeExecutor = new DriverExecuteExecutor(connection, metaData,
jdbcExecutor, rawExecutor, sqlFederationEngine);
+ executeExecutor = new DriverExecuteExecutor(connection, metaData,
jdbcExecutor, rawExecutor, sqlFederationEngine, transactionExecutor);
}
/**
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/transaction/DriverTransactionSQLStatementExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/transaction/DriverTransactionSQLStatementExecutor.java
new file mode 100644
index 00000000000..1c15a8047ae
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/transaction/DriverTransactionSQLStatementExecutor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor.engine.transaction;
+
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import
org.apache.shardingsphere.driver.jdbc.core.savepoint.ShardingSphereSavepoint;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.ReleaseSavepointStatement;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.SavepointStatement;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.TCLStatement;
+import org.apache.shardingsphere.transaction.core.TransactionOperationType;
+
+import java.sql.SQLException;
+
+/**
+ * Driver transaction statement executor.
+ */
+public final class DriverTransactionSQLStatementExecutor {
+
+ private final ShardingSphereConnection connection;
+
+ private TransactionOperationType operationType;
+
+ public DriverTransactionSQLStatementExecutor(final
ShardingSphereConnection connection) {
+ this.connection = connection;
+ }
+
+ /**
+ * Decide whether to execute TCL statement.
+ *
+ * @param queryContext query context
+ * @return whether to execute TCL statement or not
+ */
+ public boolean decide(final QueryContext queryContext) {
+ if (!(queryContext.getSqlStatementContext().getSqlStatement()
instanceof TCLStatement)) {
+ return false;
+ }
+ TCLStatement tclStatement = (TCLStatement)
queryContext.getSqlStatementContext().getSqlStatement();
+ if (tclStatement instanceof SavepointStatement) {
+ operationType = TransactionOperationType.SAVEPOINT;
+ return true;
+ }
+ if (tclStatement instanceof ReleaseSavepointStatement) {
+ operationType = TransactionOperationType.RELEASE_SAVEPOINT;
+ return true;
+ }
+ // TODO support more TCL statements
+ return false;
+ }
+
+ /**
+ * Execute TCL statement.
+ *
+ * @param tclStatement SQL statement
+ * @return whether to execute TCL statement or not
+ * @throws SQLException SQL exception
+ */
+ public boolean execute(final TCLStatement tclStatement) throws
SQLException {
+ if (TransactionOperationType.SAVEPOINT == operationType) {
+ connection.setSavepoint(((SavepointStatement)
tclStatement).getSavepointName());
+ return true;
+ }
+ if (TransactionOperationType.RELEASE_SAVEPOINT == operationType) {
+ ShardingSphereSavepoint savepoint = new
ShardingSphereSavepoint(((ReleaseSavepointStatement)
tclStatement).getSavepointName());
+ connection.releaseSavepoint(savepoint);
+ return true;
+ }
+ return false;
+ }
+}