beanww commented on issue #7686:
URL: 
https://github.com/apache/incubator-seata/issues/7686#issuecomment-3509271955

   这个问题,我自己解决了,提供思路如下:
   
不使用mybatis默认提供的SpringManagedTransaction管理spring-mybatis下的@Transaction事务,因为这个实现只支持一个数据源,自己实现一个MultiDsSpringManagedTransaction,在每次getConnection时判断是否有@DS注解导致的数据源变更,如果变更则使用新数据源,并保存所有涉及到的数据源,在close方法时对所有涉及到的数据源发起close调用(仍然使用spring的DataSourceUtils.releaseConnection,只不过是对多个connection的操作)
   这样改造后,就可以在seata的AT/XA模式下,在@GlobalTransactional 
@Transactional注解都开启的情况下,仍然可以通过@DS注解在任何类(主要是指Mapper上)切换到其他数据源,即实现了seata-XA/AT支持在一个service内切换多个数据源,无需拆分成多个服务的效果。
   
   I have resolved this issue independently and would like to share the 
approach:
   Instead of using the default SpringManagedTransaction provided by MyBatis 
for transaction management with @Transactional in a Spring-MyBatis environment 
(which only supports a single data source), I implemented a custom 
MultiDsSpringManagedTransaction.
   The core logic is as follows:
   Dynamic Data Source Detection:
   Override the getConnection() method to check if a data source switch is 
required based on the presence of the @DS annotation (or other custom logic).
   If a switch is detected, obtain a connection from the new target data source 
instead.
   Track Multiple Connections:
   Maintain a collection (e.g., List<Connection>) to track all connections 
acquired from different data sources during the transaction.
   Multi-Connection Closure:
   In the close() method, iterate over all tracked connections and release each 
one using Spring's DataSourceUtils.releaseConnection().
   This modification enables Seata's AT/XA mode to work seamlessly with 
@GlobalTransactional and @Transactional annotations, while still allowing 
dynamic data source switching via @DS (primarily on Mapper interfaces).
   Key Outcome:Achieves multi-data-source switching within a single service 
method under Seata's global transaction management, eliminating the need to 
split logic across multiple services.
   
   eg code:
   `public class MultiDsSpringManagedTransactionFactory extends 
SpringManagedTransactionFactory implements TransactionFactory {
   
        @Override
        public Transaction newTransaction(Connection conn) {
                throw new UnsupportedOperationException("New Spring 
transactions require a DataSource");
        }
   
        @Override
        public Transaction newTransaction(DataSource dataSource, 
TransactionIsolationLevel level, boolean autoCommit) {
                return new MultiDsSpringManagedTransaction(dataSource);
        }
   
   }`
   
   `public class MultiDsSpringManagedTransaction implements Transaction {
   
   //     private static final Logger LOGGER = 
LoggerFactory.getLogger(SpringManagedTransaction.class);
   
        private final DataSource dataSource;
   
        private Connection curConnection;
   
        private Map<DataSource, Connection> connectionsInTrans = new 
HashMap<>();
   
        private boolean isConnectionTransactional;
   
        private boolean autoCommit;
   
        public MultiDsSpringManagedTransaction(DataSource dataSource) {
                notNull(dataSource, "No DataSource specified");
                this.dataSource = dataSource;
        }
   
        /**
         * {@inheritDoc}
         */
        @Override
        public Connection getConnection() throws SQLException {
                // 定制逻辑:        
                // 
mybatis每一次sql执行前都会触发此方法,在此方法再执行一次动态数据源判断,就可以把数据源切换精细到mapper方法级,小于之前的远程服务级范围
                DataSource determineDataSource = dataSource;
                if (dataSource instanceof DynamicRoutingDataSource) {
                        determineDataSource = ((DynamicRoutingDataSource) 
dataSource).determineDataSource();
                }
                // 
每个事务内使用的数据源,都记录到dataSouresInTrans里,如果切换到新数据源了,则再次调用openConnection获取
                Connection usedConn = 
connectionsInTrans.get(determineDataSource);
                if (null == usedConn) {
                        log.debug("准备从数据源获取新数据库连接: {}", determineDataSource);
                        openConnection(determineDataSource);
                } else {
                        this.curConnection = 
connectionsInTrans.get(determineDataSource);
                        log.debug("复用已有数据库连接: Connection={}", 
this.curConnection);
                }
                
                return this.curConnection;
        }
   
        /**
         * Gets a connection from Spring transaction manager and discovers if 
this
         * {@code Transaction} should manage connection or let it to Spring.
         * <p>
         * It also reads autocommit setting because when using Spring 
Transaction
         * MyBatis thinks that autocommit is always false and will always call
         * commit/rollback so we need to no-op that calls.
         */
        private void openConnection(DataSource curDataSource) throws 
SQLException {
                // 定制逻辑:
                // 获取指定数据源的连接,这时会注册conn的synchronization,里面有各个时机的回调逻辑
                this.curConnection = 
DataSourceUtils.getConnection(curDataSource);
                if(log.isDebugEnabled()) {
                        log.debug("获取新数据库连接: Connection={}", this.curConnection 
== null ? "NA" : this.curConnection.hashCode());
                }
                // 把连接跟ds关联关系记录下来,在一个事务中,一个ds只需要打开一个conn
                Connection preConn = connectionsInTrans.put(curDataSource, 
curConnection);
                if(preConn != null && preConn != curConnection) {
                        throw new IllegalArgumentException("Already has 
Connection with datasource:" + curDataSource);
                }
                
                this.autoCommit = this.curConnection.getAutoCommit();
                this.isConnectionTransactional = 
DataSourceUtils.isConnectionTransactional(this.curConnection, curDataSource);
   
                log.debug("JDBC Connection [" + this.curConnection + "] will" + 
(this.isConnectionTransactional ? " " : " not ")
                                + "be managed by Spring");
        }
   
        /**
         * {@inheritDoc}
         */
        @Override
        public void commit() throws SQLException {
                if (this.curConnection != null && 
!this.isConnectionTransactional && !this.autoCommit) {
                        log.debug("Committing JDBC Connection [" + 
this.curConnection + "]");
                        this.curConnection.commit();
                }
        }
   
        /**
         * {@inheritDoc}
         */
        @Override
        public void rollback() throws SQLException {
                if (this.curConnection != null && 
!this.isConnectionTransactional && !this.autoCommit) {
                        log.debug("Rolling back JDBC Connection [" + 
this.curConnection + "]");
                        this.curConnection.rollback();
                }
        }
   
        /**
         * {@inheritDoc}
         */
        @Override
        public void close() throws SQLException {
                log.debug("关闭事务中的 {} 个数据库连接", connectionsInTrans.size());
                
                Set<Entry<DataSource, Connection>> entrySet = 
connectionsInTrans.entrySet();
                for(Entry<DataSource, Connection> entry: entrySet) {
                        if(log.isDebugEnabled()) {
                                log.debug("准备释放数据库连接: Connection={}", 
entry.getValue() == null ? "NA" : entry.getValue().hashCode());
                        }
                        releaseConnection(entry.getValue(), entry.getKey());
                }
        }
   
        /**
         * Close the given Connection, obtained from the given DataSource, if 
it is not
         * managed externally (that is, not bound to the thread).
         * 
         * @param con        the Connection to close if necessary (if this is
         *                   {@code null}, the call will be ignored)
         * @param dataSource the DataSource that the Connection was obtained 
from (may
         *                   be {@code null})
         * @see #getConnection
         */
        public static void releaseConnection(@Nullable Connection con, 
@Nullable DataSource dataSource) {
                try {
                        doReleaseConnection(con, dataSource);
                } catch (SQLException ex) {
                        log.debug("Could not close JDBC Connection", ex);
                } catch (Throwable ex) {
                        log.debug("Unexpected exception on closing JDBC 
Connection", ex);
                }
        }
   
        /**
         * Actually close the given Connection, obtained from the given 
DataSource. Same
         * as {@link #releaseConnection}, but throwing the original 
SQLException.
         * <p>
         * Directly accessed by {@link TransactionAwareDataSourceProxy}.
         * 
         * @param con        the Connection to close if necessary (if this is
         *                   {@code null}, the call will be ignored)
         * @param dataSource the DataSource that the Connection was obtained 
from (may
         *                   be {@code null})
         * @throws SQLException if thrown by JDBC methods
         * @see #doGetConnection
         */
        public static void doReleaseConnection(@Nullable Connection con, 
@Nullable DataSource dataSource)
                        throws SQLException {
                if (con == null) {
                        return;
                }
                if (dataSource != null) {
                        ConnectionHolder conHolder = (ConnectionHolder) 
TransactionSynchronizationManager.getResource(dataSource);
                        if (conHolder != null && connectionEquals(conHolder, 
con)) {
                                // It's the transactional Connection: Don't 
close it.
                                conHolder.released();
                                if(log.isDebugEnabled()) {
                                        log.debug("数据库连接使用数--: Connection={}", 
con == null ? "NA" : con.hashCode());
                                }
                                return;
                        }
                }
                doCloseConnection(con, dataSource);
        }
        
        /**
         * Determine whether the given two Connections are equal, asking the 
target
         * Connection in case of a proxy. Used to detect equality even if the
         * user passed in a raw target Connection while the held one is a proxy.
         * @param conHolder the ConnectionHolder for the held Connection 
(potentially a proxy)
         * @param passedInCon the Connection passed-in by the user
         * (potentially a target Connection without proxy)
         * @return whether the given Connections are equal
         * @see #getTargetConnection
         */
        private static boolean connectionEquals(ConnectionHolder conHolder, 
Connection passedInCon) {
                if (conHolder.getConnectionHandle() == null) {
                        return false;
                }
                Connection heldCon = conHolder.getConnection();
                // Explicitly check for identity too: for Connection handles 
that do not implement
                // "equals" properly, such as the ones Commons DBCP exposes).
                return (heldCon == passedInCon || heldCon.equals(passedInCon) ||
                                
getTargetConnection(heldCon).equals(passedInCon));
        }
   
        /**
         * Return the innermost target Connection of the given Connection. If 
the given
         * Connection is a proxy, it will be unwrapped until a non-proxy 
Connection is
         * found. Otherwise, the passed-in Connection will be returned as-is.
         * @param con the Connection proxy to unwrap
         * @return the innermost target Connection, or the passed-in one if no 
proxy
         * @see ConnectionProxy#getTargetConnection()
         */
        public static Connection getTargetConnection(Connection con) {
                Connection conToUse = con;
                while (conToUse instanceof ConnectionProxy connectionProxy) {
                        conToUse = connectionProxy.getTargetConnection();
                }
                return conToUse;
        }
        
        /**
         * Close the Connection, unless a {@link SmartDataSource} doesn't want 
us to.
         * 
         * @param con        the Connection to close if necessary
         * @param dataSource the DataSource that the Connection was obtained 
from
         * @throws SQLException if thrown by JDBC methods
         * @see Connection#close()
         * @see SmartDataSource#shouldClose(Connection)
         */
        public static void doCloseConnection(Connection con, @Nullable 
DataSource dataSource) throws SQLException {
                if (!(dataSource instanceof SmartDataSource smartDataSource) || 
smartDataSource.shouldClose(con)) {
                        if(log.isDebugEnabled()) {
                                log.debug("【实际】释放数据库连接: Connection={}", 
con.hashCode());
                        }
                        con.close();
                } else {
                        if(log.isDebugEnabled()) {
                                log.debug("【忽略】释放数据库连接: Connection={}", con == 
null ? "NA" : con.hashCode());
                        }
                }
        }
   
        /**
         * {@inheritDoc}
         */
        @Override
        public Integer getTimeout() throws SQLException {
                ConnectionHolder holder = (ConnectionHolder) 
TransactionSynchronizationManager.getResource(dataSource);
                if (holder != null && holder.hasTimeout()) {
                        return holder.getTimeToLiveInSeconds();
                }
                return null;
        }
   
   }`
   
   `@AutoConfiguration
   @ConditionalOnExpression("${seata.enabled:false} && 
${cptop.mybatis.multi-ds-transaction.enabled:true}")
   public class MultiDsSpringManagedTransactionConfig {
   
       /**
        * 注册 MultiDsSpringManagedTransactionFactory Bean
        * <p>
        * 用于支持 MyBatis 在 Seata 分布式事务场景下的多数据源事务管理
        * <p>
        * 使用 @Primary 注解确保此 TransactionFactory 优先于默认的 
SpringManagedTransactionFactory
        */
       @Bean
       @Primary
       public TransactionFactory multiDsSpringManagedTransactionFactory() {
           return new MultiDsSpringManagedTransactionFactory();
       }
   
   }`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to