This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e16dff4b3c3 Support distributed transactions across multiple logical
database(#19… (#20114)
e16dff4b3c3 is described below
commit e16dff4b3c3b14bdce7acd9945d8bd1e2570624a
Author: ZhangCheng <[email protected]>
AuthorDate: Tue Aug 16 09:14:50 2022 +0800
Support distributed transactions across multiple logical database(#19…
(#20114)
* Support distributed transactions across multiple logical database(#19894)
* Fix test case
* Generate data source name
* Add test case
* Remove final
* JDBC does not support operations across multiple logical databases in
transaction
* Fix equals usage
---
.../core/statement/ShardingSphereStatement.java | 21 +++++++++++++++++++++
.../transaction/ConnectionTransaction.java | 5 ++++-
.../transaction/core/ResourceDataSource.java | 5 ++++-
.../transaction/rule/TransactionRule.java | 5 ++++-
.../spi/ShardingSphereTransactionManager.java | 3 ++-
.../transaction/core/ResourceDataSourceTest.java | 16 +++++++++++++---
...therShardingSphereTransactionManagerFixture.java | 2 +-
.../ShardingSphereTransactionManagerFixture.java | 2 +-
.../at/SeataATShardingSphereTransactionManager.java | 4 ++--
...SeataATShardingSphereTransactionManagerTest.java | 8 +++++---
.../xa/XAShardingSphereTransactionManager.java | 4 ++--
.../xa/XAShardingSphereTransactionManagerTest.java | 18 +++++++++---------
.../jdbc/datasource/JDBCBackendDataSource.java | 2 +-
.../proxy/backend/session/ConnectionSession.java | 4 ----
.../backend/session/ConnectionSessionTest.java | 4 ++--
15 files changed, 71 insertions(+), 32 deletions(-)
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index e513f29f18a..bfbf283526f 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -37,6 +37,7 @@ import
org.apache.shardingsphere.infra.binder.segment.insert.keygen.GeneratedKey
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.dml.InsertStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
@@ -78,6 +79,7 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import org.apache.shardingsphere.transaction.TransactionHolder;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -156,6 +158,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
ResultSet result;
try {
LogicSQL logicSQL = createLogicSQL(sql);
+
checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(),
connection.getDatabaseName());
trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, logicSQL);
@@ -235,6 +238,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public int executeUpdate(final String sql) throws SQLException {
try {
LogicSQL logicSQL = createLogicSQL(sql);
+
checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(),
connection.getDatabaseName());
trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, logicSQL);
@@ -263,6 +267,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
try {
LogicSQL logicSQL = createLogicSQL(sql);
+
checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(),
connection.getDatabaseName());
trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, logicSQL);
@@ -289,6 +294,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
returnGeneratedKeys = true;
try {
LogicSQL logicSQL = createLogicSQL(sql);
+
checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(),
connection.getDatabaseName());
trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, logicSQL);
@@ -315,6 +321,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
returnGeneratedKeys = true;
try {
LogicSQL logicSQL = createLogicSQL(sql);
+
checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(),
connection.getDatabaseName());
trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, logicSQL);
@@ -431,6 +438,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private boolean execute0(final String sql, final ExecuteCallback callback)
throws SQLException {
try {
LogicSQL logicSQL = createLogicSQL(sql);
+
checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(),
connection.getDatabaseName());
trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, logicSQL);
@@ -455,6 +463,19 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
}
+ private void checkSameDatabaseNameInTransaction(final
SQLStatementContext<?> sqlStatementContext, final String
connectionDatabaseName) {
+ if (!TransactionHolder.isTransaction()) {
+ return;
+ }
+ if (sqlStatementContext instanceof TableAvailable) {
+ ((TableAvailable)
sqlStatementContext).getTablesContext().getDatabaseName().ifPresent(databaseName
-> {
+ if (!databaseName.equals(connectionDatabaseName)) {
+ throw new ShardingSphereException("JDBC does not support
operations across multiple logical databases in transaction.");
+ }
+ });
+ }
+ }
+
private JDBCExecutionUnit createTrafficExecutionUnit(final String
trafficInstanceId, final LogicSQL logicSQL) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(logicSQL.getSql(), logicSQL.getParameters()));
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
index a7bb4d30a05..5d8fc653771 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
@@ -35,6 +35,8 @@ public final class ConnectionTransaction {
private final TransactionType transactionType;
+ private final String databaseName;
+
@Setter
@Getter
private volatile boolean rollbackOnly;
@@ -46,6 +48,7 @@ public final class ConnectionTransaction {
}
public ConnectionTransaction(final String databaseName, final
TransactionType transactionType, final TransactionRule rule) {
+ this.databaseName = databaseName;
this.transactionType = transactionType;
transactionManager =
rule.getResource().getTransactionManager(transactionType);
TransactionTypeHolder.set(transactionType);
@@ -87,7 +90,7 @@ public final class ConnectionTransaction {
* @throws SQLException SQL exception
*/
public Optional<Connection> getConnection(final String dataSourceName)
throws SQLException {
- return isInTransaction() ?
Optional.of(transactionManager.getConnection(dataSourceName)) :
Optional.empty();
+ return isInTransaction() ?
Optional.of(transactionManager.getConnection(this.databaseName,
dataSourceName)) : Optional.empty();
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/ResourceDataSource.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/ResourceDataSource.java
index 886290b2175..d85fb9f7a8a 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/ResourceDataSource.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/ResourceDataSource.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.transaction.core;
+import com.google.common.base.Preconditions;
import lombok.Getter;
import javax.sql.DataSource;
@@ -34,8 +35,10 @@ public final class ResourceDataSource {
private final DataSource dataSource;
public ResourceDataSource(final String originalName, final DataSource
dataSource) {
+ String[] databaseAndDataSourceName = originalName.split("\\.");
+ Preconditions.checkState(2 == databaseAndDataSourceName.length,
String.format("Database and data source name must be provided,`%s`.",
originalName));
this.originalName = originalName;
this.dataSource = dataSource;
- uniqueResourceName = ResourceIDGenerator.getInstance().nextId() +
originalName;
+ this.uniqueResourceName = ResourceIDGenerator.getInstance().nextId() +
databaseAndDataSourceName[1];
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java
index 9749083bc2f..d1e542c51be 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java
@@ -72,7 +72,10 @@ public final class TransactionRule implements GlobalRule,
ResourceHeldRule<Shard
Map<String, DataSource> dataSourceMap = new
HashMap<>(databases.size());
Set<DatabaseType> databaseTypes = new HashSet<>();
for (Entry<String, ShardingSphereDatabase> entry :
databases.entrySet()) {
-
dataSourceMap.putAll(entry.getValue().getResource().getDataSources());
+ ShardingSphereDatabase database = entry.getValue();
+ database.getResource().getDataSources().forEach((key, value) -> {
+ dataSourceMap.put(database.getName() + "." + key, value);
+ });
if (null != entry.getValue().getResource().getDatabaseType()) {
databaseTypes.add(entry.getValue().getResource().getDatabaseType());
}
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereTransactionManager.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereTransactionManager.java
index e56bd238456..702a5208c09 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereTransactionManager.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereTransactionManager.java
@@ -56,11 +56,12 @@ public interface ShardingSphereTransactionManager extends
AutoCloseable {
/**
* Get transactional connection.
*
+ * @param databaseName database name
* @param dataSourceName data source name
* @return connection
* @throws SQLException SQL exception
*/
- Connection getConnection(String dataSourceName) throws SQLException;
+ Connection getConnection(String databaseName, String dataSourceName)
throws SQLException;
/**
* Begin transaction.
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/ResourceDataSourceTest.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/ResourceDataSourceTest.java
index eb88846db62..f376edd652e 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/ResourceDataSourceTest.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/ResourceDataSourceTest.java
@@ -27,12 +27,22 @@ import static org.junit.Assert.assertTrue;
public final class ResourceDataSourceTest {
+ private static final String DATABASE_NAME = "sharding_db";
+
+ private static final String DATA_SOURCE_NAME = "fooDataSource";
+
@Test
public void assertNewInstance() {
- ResourceDataSource actual = new ResourceDataSource("fooDataSource",
new MockedDataSource());
- assertThat(actual.getOriginalName(), is("fooDataSource"));
+ String originalName = DATABASE_NAME + "." + DATA_SOURCE_NAME;
+ ResourceDataSource actual = new ResourceDataSource(originalName, new
MockedDataSource());
+ assertThat(actual.getOriginalName(), is(originalName));
assertThat(actual.getDataSource(), instanceOf(MockedDataSource.class));
assertTrue(actual.getUniqueResourceName().startsWith("resource"));
- assertTrue(actual.getUniqueResourceName().endsWith("fooDataSource"));
+ assertTrue(actual.getUniqueResourceName().endsWith(DATA_SOURCE_NAME));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void assertDataSourceNameOnlyFailure() {
+ new ResourceDataSource(DATA_SOURCE_NAME, new MockedDataSource());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/OtherShardingSphereTransactionManagerFixture.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/OtherShardingSphereTransactionManagerFixture.java
index 807486266c5..effa0645d15 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/OtherShardingSphereTransactionManagerFixture.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/OtherShardingSphereTransactionManagerFixture.java
@@ -42,7 +42,7 @@ public final class
OtherShardingSphereTransactionManagerFixture implements Shard
}
@Override
- public Connection getConnection(final String dataSourceName) {
+ public Connection getConnection(final String databaseName, final String
dataSourceName) {
return null;
}
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/ShardingSphereTransactionManagerFixture.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/ShardingSphereTransactionManagerFixture.java
index 1acabde96a4..5f52e4b5691 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/ShardingSphereTransactionManagerFixture.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/ShardingSphereTransactionManagerFixture.java
@@ -49,7 +49,7 @@ public final class ShardingSphereTransactionManagerFixture
implements ShardingSp
}
@Override
- public Connection getConnection(final String dataSourceName) {
+ public Connection getConnection(final String databaseName, final String
dataSourceName) {
return null;
}
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/main/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManager.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/main/java/org/apache/shardingsphere/transaction/base/se
[...]
index 7e1d3710566..6b075db1905 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/main/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManager.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/main/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManager.java
@@ -90,9 +90,9 @@ public final class SeataATShardingSphereTransactionManager
implements ShardingSp
}
@Override
- public Connection getConnection(final String dataSourceName) throws
SQLException {
+ public Connection getConnection(final String databaseName, final String
dataSourceName) throws SQLException {
Preconditions.checkState(enableSeataAT, "sharding seata-at transaction
has been disabled.");
- return dataSourceMap.get(dataSourceName).getConnection();
+ return dataSourceMap.get(databaseName + "." +
dataSourceName).getConnection();
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/test/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManagerTest.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/test/java/org/apache/shardingsphere/transaction/bas
[...]
index 6fba061a6c5..033b85c60f3 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/test/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManagerTest.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/test/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManagerTest.java
@@ -61,6 +61,8 @@ public final class
SeataATShardingSphereTransactionManagerTest {
private static final MockSeataServer MOCK_SEATA_SERVER = new
MockSeataServer();
+ private static final String DATA_SOURCE_UNIQUE_NAME = "sharding_db.foo_ds";
+
private final SeataATShardingSphereTransactionManager
seataTransactionManager = new SeataATShardingSphereTransactionManager();
private final Queue<Object> requestQueue =
MOCK_SEATA_SERVER.getMessageHandler().getRequestQueue();
@@ -84,7 +86,7 @@ public final class
SeataATShardingSphereTransactionManagerTest {
@Before
public void setUp() {
- seataTransactionManager.init(DatabaseTypeFactory.getInstance("MySQL"),
Collections.singletonList(new ResourceDataSource("foo_ds", new
MockedDataSource())), "Seata");
+ seataTransactionManager.init(DatabaseTypeFactory.getInstance("MySQL"),
Collections.singletonList(new ResourceDataSource(DATA_SOURCE_UNIQUE_NAME, new
MockedDataSource())), "Seata");
}
@After
@@ -102,13 +104,13 @@ public final class
SeataATShardingSphereTransactionManagerTest {
public void assertInit() {
Map<String, DataSource> actual = getDataSourceMap();
assertThat(actual.size(), is(1));
- assertThat(actual.get("foo_ds"), instanceOf(DataSourceProxy.class));
+ assertThat(actual.get(DATA_SOURCE_UNIQUE_NAME),
instanceOf(DataSourceProxy.class));
assertThat(seataTransactionManager.getTransactionType(),
is(TransactionType.BASE));
}
@Test
public void assertGetConnection() throws SQLException {
- Connection actual = seataTransactionManager.getConnection("foo_ds");
+ Connection actual =
seataTransactionManager.getConnection("sharding_db", "foo_ds");
assertThat(actual, instanceOf(ConnectionProxy.class));
}
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java
index 2b196d25c54..61153058a03 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java
@@ -74,9 +74,9 @@ public final class XAShardingSphereTransactionManager
implements ShardingSphereT
}
@Override
- public Connection getConnection(final String dataSourceName) throws
SQLException {
+ public Connection getConnection(final String databaseName, final String
dataSourceName) throws SQLException {
try {
- return cachedDataSources.get(dataSourceName).getConnection();
+ return cachedDataSources.get(databaseName + "." +
dataSourceName).getConnection();
} catch (final SystemException | RollbackException ex) {
throw new SQLException(ex);
}
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/test/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManagerTest.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/test/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManag
[...]
index 57a23e2b9c3..a0159a960b5 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/test/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManagerTest.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/test/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManagerTest.java
@@ -82,9 +82,9 @@ public final class XAShardingSphereTransactionManagerTest {
@Test
public void assertGetConnection() throws SQLException {
xaTransactionManager.begin();
- Connection actual1 = xaTransactionManager.getConnection("ds1");
- Connection actual2 = xaTransactionManager.getConnection("ds2");
- Connection actual3 = xaTransactionManager.getConnection("ds3");
+ Connection actual1 = xaTransactionManager.getConnection("demo_ds_1",
"ds1");
+ Connection actual2 = xaTransactionManager.getConnection("demo_ds_2",
"ds2");
+ Connection actual3 = xaTransactionManager.getConnection("demo_ds_3",
"ds3");
assertThat(actual1, instanceOf(Connection.class));
assertThat(actual2, instanceOf(Connection.class));
assertThat(actual3, instanceOf(Connection.class));
@@ -93,10 +93,10 @@ public final class XAShardingSphereTransactionManagerTest {
@Test
public void assertGetConnectionOfNestedTransaction() throws SQLException {
- ThreadLocal<Map<Transaction, Connection>> transactions =
getEnlistedTransactions(getCachedDataSources().get("ds1"));
+ ThreadLocal<Map<Transaction, Connection>> transactions =
getEnlistedTransactions(getCachedDataSources().get("demo_ds_1.ds1"));
xaTransactionManager.begin();
assertTrue(transactions.get().isEmpty());
- xaTransactionManager.getConnection("ds1");
+ xaTransactionManager.getConnection("demo_ds_1", "ds1");
assertThat(transactions.get().size(), is(1));
executeNestedTransaction(transactions);
assertThat(transactions.get().size(), is(1));
@@ -106,7 +106,7 @@ public final class XAShardingSphereTransactionManagerTest {
private void executeNestedTransaction(final ThreadLocal<Map<Transaction,
Connection>> transactions) throws SQLException {
xaTransactionManager.begin();
- xaTransactionManager.getConnection("ds1");
+ xaTransactionManager.getConnection("demo_ds_1", "ds1");
assertThat(transactions.get().size(), is(2));
xaTransactionManager.commit(false);
assertThat(transactions.get().size(), is(1));
@@ -153,9 +153,9 @@ public final class XAShardingSphereTransactionManagerTest {
private Collection<ResourceDataSource> createResourceDataSources(final
DatabaseType databaseType) {
List<ResourceDataSource> result = new LinkedList<>();
- result.add(new ResourceDataSource("ds1",
DataSourceUtils.build(HikariDataSource.class, databaseType, "demo_ds_1")));
- result.add(new ResourceDataSource("ds2",
DataSourceUtils.build(HikariDataSource.class, databaseType, "demo_ds_2")));
- result.add(new ResourceDataSource("ds3",
DataSourceUtils.build(AtomikosDataSourceBean.class, databaseType,
"demo_ds_3")));
+ result.add(new ResourceDataSource("demo_ds_1.ds1",
DataSourceUtils.build(HikariDataSource.class, databaseType, "demo_ds_1")));
+ result.add(new ResourceDataSource("demo_ds_2.ds2",
DataSourceUtils.build(HikariDataSource.class, databaseType, "demo_ds_2")));
+ result.add(new ResourceDataSource("demo_ds_3.ds3",
DataSourceUtils.build(AtomikosDataSourceBean.class, databaseType,
"demo_ds_3")));
return result;
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/datasource/JDBCBackendDataSource.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/datasource/JDBCBackendDataSource.java
index ed631b4b91f..ae329b3366f 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/datasource/JDBCBackendDataSource.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/datasource/JDBCBackendDataSource.java
@@ -106,7 +106,7 @@ public final class JDBCBackendDataSource implements
BackendDataSource {
private Connection createConnection(final String databaseName, final
String dataSourceName, final DataSource dataSource, final TransactionType
transactionType) throws SQLException {
TransactionRule transactionRule =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class);
ShardingSphereTransactionManager transactionManager =
transactionRule.getResource().getTransactionManager(transactionType);
- Connection result = isInTransaction(transactionManager) ?
transactionManager.getConnection(dataSourceName) : dataSource.getConnection();
+ Connection result = isInTransaction(transactionManager) ?
transactionManager.getConnection(databaseName, dataSourceName) :
dataSource.getConnection();
if (dataSourceName.contains(".")) {
String catalog = dataSourceName.split("\\.")[1];
result.setCatalog(catalog);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
index f9cba6704fe..a86edd93b10 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
@@ -23,7 +23,6 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorStatementManager;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.session.ConnectionContext;
@@ -104,9 +103,6 @@ public final class ConnectionSession {
if (null != databaseName && databaseName.equals(this.databaseName)) {
return;
}
- if (transactionStatus.isInTransaction()) {
- throw new ShardingSphereException("Failed to switch database,
please terminate current transaction.");
- }
if (statementManager instanceof JDBCBackendStatement) {
((JDBCBackendStatement)
statementManager).setDatabaseName(databaseName);
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
index cb9739aa703..2dc57a7c380 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
@@ -78,8 +78,8 @@ public final class ConnectionSessionTest extends
ProxyContextRestorer {
connectionSession.getTransactionStatus().setTransactionType(TransactionType.XA);
}
- @Test(expected = ShardingSphereException.class)
- public void assertFailedSwitchSchemaWhileBegin() throws SQLException {
+ @Test
+ public void assertSwitchSchemaWhileBegin() throws SQLException {
connectionSession.setCurrentDatabase("db");
JDBCBackendTransactionManager transactionManager = new
JDBCBackendTransactionManager(backendConnection);
transactionManager.begin();