beanww commented on issue #7686:
URL:
https://github.com/apache/incubator-seata/issues/7686#issuecomment-3509271955
这个问题,我自己解决了,提供思路如下:
不使用mybatis默认提供的SpringManagedTransaction管理spring-mybatis下的@Transaction事务,因为这个实现只支持一个数据源,自己实现一个MultiDsSpringManagedTransaction,在每次getConnection时判断是否有@DS注解导致的数据源变更,如果变更则使用新数据源,并保存所有涉及到的数据源,在close方法时对所有涉及到的数据源发起close调用(仍然使用spring的DataSourceUtils.releaseConnection,只不过是对多个connection的操作)
这样改造后,就可以在seata的AT/XA模式下,在@GlobalTransactional
@Transactional注解都开启的情况下,仍然可以通过@DS注解在任何类(主要是指Mapper上)切换到其他数据源,即实现了seata-XA/AT支持在一个service内切换多个数据源,无需拆分成多个服务的效果。
I have resolved this issue independently and would like to share the
approach:
Instead of using the default SpringManagedTransaction provided by MyBatis
for transaction management with @Transactional in a Spring-MyBatis environment
(which only supports a single data source), I implemented a custom
MultiDsSpringManagedTransaction.
The core logic is as follows:
Dynamic Data Source Detection:
Override the getConnection() method to check if a data source switch is
required based on the presence of the @DS annotation (or other custom logic).
If a switch is detected, obtain a connection from the new target data source
instead.
Track Multiple Connections:
Maintain a collection (e.g., List<Connection>) to track all connections
acquired from different data sources during the transaction.
Multi-Connection Closure:
In the close() method, iterate over all tracked connections and release each
one using Spring's DataSourceUtils.releaseConnection().
This modification enables Seata's AT/XA mode to work seamlessly with
@GlobalTransactional and @Transactional annotations, while still allowing
dynamic data source switching via @DS (primarily on Mapper interfaces).
Key Outcome:Achieves multi-data-source switching within a single service
method under Seata's global transaction management, eliminating the need to
split logic across multiple services.
eg code:
`public class MultiDsSpringManagedTransactionFactory extends
SpringManagedTransactionFactory implements TransactionFactory {
@Override
public Transaction newTransaction(Connection conn) {
throw new UnsupportedOperationException("New Spring
transactions require a DataSource");
}
@Override
public Transaction newTransaction(DataSource dataSource,
TransactionIsolationLevel level, boolean autoCommit) {
return new MultiDsSpringManagedTransaction(dataSource);
}
}`
`public class MultiDsSpringManagedTransaction implements Transaction {
// private static final Logger LOGGER =
LoggerFactory.getLogger(SpringManagedTransaction.class);
private final DataSource dataSource;
private Connection curConnection;
private Map<DataSource, Connection> connectionsInTrans = new
HashMap<>();
private boolean isConnectionTransactional;
private boolean autoCommit;
public MultiDsSpringManagedTransaction(DataSource dataSource) {
notNull(dataSource, "No DataSource specified");
this.dataSource = dataSource;
}
/**
* {@inheritDoc}
*/
@Override
public Connection getConnection() throws SQLException {
// 定制逻辑:
//
mybatis每一次sql执行前都会触发此方法,在此方法再执行一次动态数据源判断,就可以把数据源切换精细到mapper方法级,小于之前的远程服务级范围
DataSource determineDataSource = dataSource;
if (dataSource instanceof DynamicRoutingDataSource) {
determineDataSource = ((DynamicRoutingDataSource)
dataSource).determineDataSource();
}
//
每个事务内使用的数据源,都记录到dataSouresInTrans里,如果切换到新数据源了,则再次调用openConnection获取
Connection usedConn =
connectionsInTrans.get(determineDataSource);
if (null == usedConn) {
log.debug("准备从数据源获取新数据库连接: {}", determineDataSource);
openConnection(determineDataSource);
} else {
this.curConnection =
connectionsInTrans.get(determineDataSource);
log.debug("复用已有数据库连接: Connection={}",
this.curConnection);
}
return this.curConnection;
}
/**
* Gets a connection from Spring transaction manager and discovers if
this
* {@code Transaction} should manage connection or let it to Spring.
* <p>
* It also reads autocommit setting because when using Spring
Transaction
* MyBatis thinks that autocommit is always false and will always call
* commit/rollback so we need to no-op that calls.
*/
private void openConnection(DataSource curDataSource) throws
SQLException {
// 定制逻辑:
// 获取指定数据源的连接,这时会注册conn的synchronization,里面有各个时机的回调逻辑
this.curConnection =
DataSourceUtils.getConnection(curDataSource);
if(log.isDebugEnabled()) {
log.debug("获取新数据库连接: Connection={}", this.curConnection
== null ? "NA" : this.curConnection.hashCode());
}
// 把连接跟ds关联关系记录下来,在一个事务中,一个ds只需要打开一个conn
Connection preConn = connectionsInTrans.put(curDataSource,
curConnection);
if(preConn != null && preConn != curConnection) {
throw new IllegalArgumentException("Already has
Connection with datasource:" + curDataSource);
}
this.autoCommit = this.curConnection.getAutoCommit();
this.isConnectionTransactional =
DataSourceUtils.isConnectionTransactional(this.curConnection, curDataSource);
log.debug("JDBC Connection [" + this.curConnection + "] will" +
(this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
/**
* {@inheritDoc}
*/
@Override
public void commit() throws SQLException {
if (this.curConnection != null &&
!this.isConnectionTransactional && !this.autoCommit) {
log.debug("Committing JDBC Connection [" +
this.curConnection + "]");
this.curConnection.commit();
}
}
/**
* {@inheritDoc}
*/
@Override
public void rollback() throws SQLException {
if (this.curConnection != null &&
!this.isConnectionTransactional && !this.autoCommit) {
log.debug("Rolling back JDBC Connection [" +
this.curConnection + "]");
this.curConnection.rollback();
}
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws SQLException {
log.debug("关闭事务中的 {} 个数据库连接", connectionsInTrans.size());
Set<Entry<DataSource, Connection>> entrySet =
connectionsInTrans.entrySet();
for(Entry<DataSource, Connection> entry: entrySet) {
if(log.isDebugEnabled()) {
log.debug("准备释放数据库连接: Connection={}",
entry.getValue() == null ? "NA" : entry.getValue().hashCode());
}
releaseConnection(entry.getValue(), entry.getKey());
}
}
/**
* Close the given Connection, obtained from the given DataSource, if
it is not
* managed externally (that is, not bound to the thread).
*
* @param con the Connection to close if necessary (if this is
* {@code null}, the call will be ignored)
* @param dataSource the DataSource that the Connection was obtained
from (may
* be {@code null})
* @see #getConnection
*/
public static void releaseConnection(@Nullable Connection con,
@Nullable DataSource dataSource) {
try {
doReleaseConnection(con, dataSource);
} catch (SQLException ex) {
log.debug("Could not close JDBC Connection", ex);
} catch (Throwable ex) {
log.debug("Unexpected exception on closing JDBC
Connection", ex);
}
}
/**
* Actually close the given Connection, obtained from the given
DataSource. Same
* as {@link #releaseConnection}, but throwing the original
SQLException.
* <p>
* Directly accessed by {@link TransactionAwareDataSourceProxy}.
*
* @param con the Connection to close if necessary (if this is
* {@code null}, the call will be ignored)
* @param dataSource the DataSource that the Connection was obtained
from (may
* be {@code null})
* @throws SQLException if thrown by JDBC methods
* @see #doGetConnection
*/
public static void doReleaseConnection(@Nullable Connection con,
@Nullable DataSource dataSource)
throws SQLException {
if (con == null) {
return;
}
if (dataSource != null) {
ConnectionHolder conHolder = (ConnectionHolder)
TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && connectionEquals(conHolder,
con)) {
// It's the transactional Connection: Don't
close it.
conHolder.released();
if(log.isDebugEnabled()) {
log.debug("数据库连接使用数--: Connection={}",
con == null ? "NA" : con.hashCode());
}
return;
}
}
doCloseConnection(con, dataSource);
}
/**
* Determine whether the given two Connections are equal, asking the
target
* Connection in case of a proxy. Used to detect equality even if the
* user passed in a raw target Connection while the held one is a proxy.
* @param conHolder the ConnectionHolder for the held Connection
(potentially a proxy)
* @param passedInCon the Connection passed-in by the user
* (potentially a target Connection without proxy)
* @return whether the given Connections are equal
* @see #getTargetConnection
*/
private static boolean connectionEquals(ConnectionHolder conHolder,
Connection passedInCon) {
if (conHolder.getConnectionHandle() == null) {
return false;
}
Connection heldCon = conHolder.getConnection();
// Explicitly check for identity too: for Connection handles
that do not implement
// "equals" properly, such as the ones Commons DBCP exposes).
return (heldCon == passedInCon || heldCon.equals(passedInCon) ||
getTargetConnection(heldCon).equals(passedInCon));
}
/**
* Return the innermost target Connection of the given Connection. If
the given
* Connection is a proxy, it will be unwrapped until a non-proxy
Connection is
* found. Otherwise, the passed-in Connection will be returned as-is.
* @param con the Connection proxy to unwrap
* @return the innermost target Connection, or the passed-in one if no
proxy
* @see ConnectionProxy#getTargetConnection()
*/
public static Connection getTargetConnection(Connection con) {
Connection conToUse = con;
while (conToUse instanceof ConnectionProxy connectionProxy) {
conToUse = connectionProxy.getTargetConnection();
}
return conToUse;
}
/**
* Close the Connection, unless a {@link SmartDataSource} doesn't want
us to.
*
* @param con the Connection to close if necessary
* @param dataSource the DataSource that the Connection was obtained
from
* @throws SQLException if thrown by JDBC methods
* @see Connection#close()
* @see SmartDataSource#shouldClose(Connection)
*/
public static void doCloseConnection(Connection con, @Nullable
DataSource dataSource) throws SQLException {
if (!(dataSource instanceof SmartDataSource smartDataSource) ||
smartDataSource.shouldClose(con)) {
if(log.isDebugEnabled()) {
log.debug("【实际】释放数据库连接: Connection={}",
con.hashCode());
}
con.close();
} else {
if(log.isDebugEnabled()) {
log.debug("【忽略】释放数据库连接: Connection={}", con ==
null ? "NA" : con.hashCode());
}
}
}
/**
* {@inheritDoc}
*/
@Override
public Integer getTimeout() throws SQLException {
ConnectionHolder holder = (ConnectionHolder)
TransactionSynchronizationManager.getResource(dataSource);
if (holder != null && holder.hasTimeout()) {
return holder.getTimeToLiveInSeconds();
}
return null;
}
}`
`@AutoConfiguration
@ConditionalOnExpression("${seata.enabled:false} &&
${cptop.mybatis.multi-ds-transaction.enabled:true}")
public class MultiDsSpringManagedTransactionConfig {
/**
* 注册 MultiDsSpringManagedTransactionFactory Bean
* <p>
* 用于支持 MyBatis 在 Seata 分布式事务场景下的多数据源事务管理
* <p>
* 使用 @Primary 注解确保此 TransactionFactory 优先于默认的
SpringManagedTransactionFactory
*/
@Bean
@Primary
public TransactionFactory multiDsSpringManagedTransactionFactory() {
return new MultiDsSpringManagedTransactionFactory();
}
}`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]