This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new a9e19b78e7 feature: Reuse connection to merge branch transactions 
(#7509)
a9e19b78e7 is described below

commit a9e19b78e7fe8ebf081963dcd3ed958593e2146d
Author: PeppaO <[email protected]>
AuthorDate: Wed Aug 13 13:38:30 2025 +0800

    feature: Reuse connection to merge branch transactions (#7509)
---
 changes/en-us/2.x.md                               |   1 +
 changes/zh-cn/2.x.md                               |   1 +
 .../org/apache/seata/core/context/RootContext.java |  18 +++
 integration-tx-api/pom.xml                         |   1 -
 .../tx/api/interceptor/InvocationHandlerType.java  |   7 +-
 .../CombineTransactionalInterceptorHandler.java    | 135 +++++++++++++++++++++
 .../CombineTransactionalInterceptorParser.java     |  91 ++++++++++++++
 .../annotation/CombineTransactional.java}          |  32 ++---
 ...ation.tx.api.interceptor.parser.InterfaceParser |   3 +-
 .../interceptor/parser/BusinessCombineImpl.java}   |  33 +++--
 .../CombineTransactionalInterceptorParserTest.java |  71 +++++++++++
 .../parser/ProxyUtilsGlobalTransactionalTest.java  |  11 ++
 .../combine/CombineConnectionHolder.java           |  61 ++++++++++
 .../seata/rm/datasource/xa/ConnectionProxyXA.java  |  17 +++
 .../seata/rm/datasource/xa/DataSourceProxyXA.java  |  19 ++-
 .../combine/CombineConnectionHolderTest.java       | 122 +++++++++++++++++++
 .../rm/datasource/xa/DataSourceProxyXATest.java    |  62 ++++++++--
 17 files changed, 636 insertions(+), 49 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 99ea9240c8..52df620a7a 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -21,6 +21,7 @@ Add changes here for all PR submitted to the 2.x branch.
 ### feature:
 
 - [[#7485](https://github.com/apache/incubator-seata/pull/7485)] Add http 
request filter for seata-server
+- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] Reuse 
connection to merge branch transactions
 - [[#7492](https://github.com/apache/incubator-seata/pull/7492)] upgrade HTTP 
client in common module to support HTTP/2
 
 
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index f87de6e290..e53d7a531d 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -21,6 +21,7 @@
 ### feature:
 
 - [[#7485](https://github.com/apache/incubator-seata/pull/7485)] 
给seata-server端的http请求添加过滤器
+- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] 复用连接合并分支事务
 - [[#7492](https://github.com/apache/incubator-seata/pull/7492)] 升级 common 
模块中的 HTTP 客户端以支持 HTTP/2
 
 
diff --git a/core/src/main/java/org/apache/seata/core/context/RootContext.java 
b/core/src/main/java/org/apache/seata/core/context/RootContext.java
index d8f68669ea..a34eaabbd3 100644
--- a/core/src/main/java/org/apache/seata/core/context/RootContext.java
+++ b/core/src/main/java/org/apache/seata/core/context/RootContext.java
@@ -91,6 +91,8 @@ public class RootContext {
 
     private static BranchType DEFAULT_BRANCH_TYPE;
 
+    public static final String KEY_COMBINE_TRANSACTION_FLAG = "TX_COMBINE";
+
     public static void setDefaultBranchType(BranchType defaultBranchType) {
         if (defaultBranchType != AT && defaultBranchType != XA) {
             throw new IllegalArgumentException("The default branch type must 
be " + AT + " or " + XA + "."
@@ -207,6 +209,10 @@ public class RootContext {
         return BranchType.SAGA == getBranchType();
     }
 
+    public static boolean inXABranch() {
+        return BranchType.XA == getBranchType();
+    }
+
     /**
      * get the branch type
      *
@@ -282,4 +288,16 @@ public class RootContext {
     public static Map<String, Object> entries() {
         return CONTEXT_HOLDER.entries();
     }
+
+    public static boolean inCombineTransaction() {
+        return CONTEXT_HOLDER.get(KEY_COMBINE_TRANSACTION_FLAG) != null;
+    }
+
+    public static void bindCombineTransaction() {
+        CONTEXT_HOLDER.put(KEY_COMBINE_TRANSACTION_FLAG, true);
+    }
+
+    public static void unbindCombineTransaction() {
+        CONTEXT_HOLDER.remove(KEY_COMBINE_TRANSACTION_FLAG);
+    }
 }
diff --git a/integration-tx-api/pom.xml b/integration-tx-api/pom.xml
index 624edb9fd5..69970790ae 100644
--- a/integration-tx-api/pom.xml
+++ b/integration-tx-api/pom.xml
@@ -61,7 +61,6 @@
             <groupId>net.bytebuddy</groupId>
             <artifactId>byte-buddy</artifactId>
         </dependency>
-        
     </dependencies>
     
 </project>
diff --git 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
index d9ac494c4c..2ca0646ef7 100644
--- 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
+++ 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
@@ -34,5 +34,10 @@ public enum InvocationHandlerType {
     /**
      * SagaAnnotation InvocationHandler
      */
-    SagaAnnotation
+    SagaAnnotation,
+
+    /**
+     * CombineTransactional InvocationHandler
+     */
+    CombineTransactional
 }
diff --git 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/CombineTransactionalInterceptorHandler.java
 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/CombineTransactionalInterceptorHandler.java
new file mode 100644
index 0000000000..05f1381087
--- /dev/null
+++ 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/CombineTransactionalInterceptorHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.tx.api.interceptor.handler;
+
+import org.apache.seata.core.context.RootContext;
+import org.apache.seata.integration.tx.api.interceptor.InvocationHandlerType;
+import org.apache.seata.integration.tx.api.interceptor.InvocationWrapper;
+import 
org.apache.seata.integration.tx.api.interceptor.SeataInterceptorPosition;
+import org.apache.seata.integration.tx.api.util.ClassUtils;
+import org.apache.seata.rm.datasource.combine.CombineConnectionHolder;
+import org.apache.seata.rm.datasource.xa.ConnectionProxyXA;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * The type Combine transactional interceptor handler.
+ *
+ */
+public class CombineTransactionalInterceptorHandler extends 
AbstractProxyInvocationHandler {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CombineTransactionalInterceptorHandler.class);
+
+    private Set<String> methodsToProxy;
+
+    private static volatile ScheduledThreadPoolExecutor executor;
+
+    public CombineTransactionalInterceptorHandler(Set<String> methodsToProxy) {
+        this.methodsToProxy = methodsToProxy;
+    }
+
+    @Override
+    protected Object doInvoke(InvocationWrapper invocation) throws Throwable {
+        Class<?> targetClass = invocation.getTarget().getClass();
+        Method specificMethod = 
ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
+        if (specificMethod != null && 
!specificMethod.getDeclaringClass().equals(Object.class)) {
+            return handleCombineTransactional(invocation);
+        }
+        return invocation.proceed();
+    }
+
+    private Object handleCombineTransactional(final InvocationWrapper 
methodInvocation) throws Throwable {
+        if (!RootContext.inGlobalTransaction()) {
+            // not in transaction, or this interceptor is disabled
+            return methodInvocation.proceed();
+        }
+
+        RootContext.bindCombineTransaction();
+
+        try {
+            Object result = methodInvocation.proceed();
+
+            for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) 
{
+                conn.setCombine(false);
+                conn.commit();
+            }
+            return result;
+        } catch (Exception e) {
+            LOGGER.error(
+                    String.format(
+                            "@CombineTransactional failed to handle,xid: %s 
occur exp msg: %s",
+                            RootContext.getXID(), e.getMessage()),
+                    e);
+            // doRollback
+            for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) 
{
+                conn.setCombine(false);
+                conn.rollback();
+            }
+            throw e;
+        } finally {
+            for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) 
{
+                try {
+                    // Reset autocommit (if not autocommitting)
+                    if (!conn.getAutoCommit()) {
+                        conn.setAutoCommit(true);
+                    }
+                } catch (Throwable t) {
+                    // Record the exception of resetting the auto-commit, but 
do not interrupt and continue to try to
+                    // close
+                    LOGGER.error("Failed to reset autoCommit to true for 
connection: {}", conn, t);
+                }
+                try {
+                    if (conn.isClosed()) {
+                        LOGGER.warn("Connection is closed: {}", conn);
+                    }
+                    conn.close();
+                } catch (Throwable t) {
+                    // Record the exception of closing the connection, but do 
not interrupt the loop and continue to
+                    // process the next connection
+                    LOGGER.error("Failed to close connection: {}", conn, t);
+                }
+            }
+            // Clean up local cache connections
+            CombineConnectionHolder.clear();
+            RootContext.unbindCombineTransaction();
+        }
+    }
+
+    @Override
+    public Set<String> getMethodsToProxy() {
+        return methodsToProxy;
+    }
+
+    @Override
+    public SeataInterceptorPosition getPosition() {
+        return SeataInterceptorPosition.AfterTransaction;
+    }
+
+    @Override
+    public String type() {
+        return InvocationHandlerType.CombineTransactional.name();
+    }
+
+    @Override
+    public int order() {
+        return 1;
+    }
+}
diff --git 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParser.java
 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParser.java
new file mode 100644
index 0000000000..622eee3cc6
--- /dev/null
+++ 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParser.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.tx.api.interceptor.parser;
+
+import org.apache.seata.common.util.CollectionUtils;
+import org.apache.seata.common.util.ReflectionUtil;
+import 
org.apache.seata.integration.tx.api.interceptor.handler.CombineTransactionalInterceptorHandler;
+import 
org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
+import org.apache.seata.spring.annotation.CombineTransactional;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+public class CombineTransactionalInterceptorParser implements InterfaceParser {
+
+    protected final Set<String> methodsToProxy = new HashSet<>();
+
+    /**
+     * @param target
+     * @return
+     * @throws Exception
+     * @see CombineTransactional // Combine annotation
+     */
+    @Override
+    public ProxyInvocationHandler parserInterfaceToProxy(Object target, String 
objectName) throws Exception {
+        Class<?> serviceInterface = 
DefaultTargetClassParser.get().findTargetClass(target);
+        Class<?>[] interfacesIfJdk = 
DefaultTargetClassParser.get().findInterfaces(target);
+
+        if (existsAnnotation(serviceInterface) || 
existsAnnotation(interfacesIfJdk)) {
+            return createProxyInvocationHandler();
+        }
+
+        return null;
+    }
+
+    protected ProxyInvocationHandler createProxyInvocationHandler() {
+        return new CombineTransactionalInterceptorHandler(methodsToProxy);
+    }
+
+    @Override
+    public IfNeedEnhanceBean parseIfNeedEnhancement(Class<?> beanClass) {
+        Set<Class<?>> interfaceClasses = 
ReflectionUtil.getInterfaces(beanClass);
+        Class<?>[] interfaceClasseArray = interfaceClasses.toArray(new 
Class<?>[0]);
+
+        IfNeedEnhanceBean ifNeedEnhanceBean = new IfNeedEnhanceBean();
+        if (existsAnnotation(beanClass) || 
existsAnnotation(interfaceClasseArray)) {
+            ifNeedEnhanceBean.setIfNeed(true);
+            ifNeedEnhanceBean.setNeedEnhanceEnum(NeedEnhanceEnum.SERVICE_BEAN);
+        }
+        return ifNeedEnhanceBean;
+    }
+
+    protected boolean existsAnnotation(Class<?>... classes) {
+        boolean result = false;
+        if (CollectionUtils.isNotEmpty(classes)) {
+            for (Class<?> clazz : classes) {
+                if (clazz == null) {
+                    continue;
+                }
+                CombineTransactional trxAnno = 
clazz.getAnnotation(CombineTransactional.class);
+                if (trxAnno != null) {
+                    return true;
+                }
+                Method[] methods = clazz.getMethods();
+                for (Method method : methods) {
+                    trxAnno = method.getAnnotation(CombineTransactional.class);
+                    if (trxAnno != null) {
+                        methodsToProxy.add(method.getName());
+                        result = true;
+                    }
+                }
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
 
b/integration-tx-api/src/main/java/org/apache/seata/spring/annotation/CombineTransactional.java
similarity index 63%
copy from 
integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
copy to 
integration-tx-api/src/main/java/org/apache/seata/spring/annotation/CombineTransactional.java
index d9ac494c4c..bda148d401 100644
--- 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
+++ 
b/integration-tx-api/src/main/java/org/apache/seata/spring/annotation/CombineTransactional.java
@@ -14,25 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seata.integration.tx.api.interceptor;
+package org.apache.seata.spring.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
 /**
- * The  InvocationHandlerType enum
+ * The interface Combine transactional.
+ * This annotation is only valid in XA mode.
  */
-public enum InvocationHandlerType {
-
-    /**
-     * GlobalTransactional InvocationHandler
-     */
-    GlobalTransactional,
-
-    /**
-     * TwoPhase InvocationHandler
-     */
-    TwoPhaseAnnotation,
-
-    /**
-     * SagaAnnotation InvocationHandler
-     */
-    SagaAnnotation
-}
+@Target({ElementType.TYPE, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface CombineTransactional {}
diff --git 
a/integration-tx-api/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser
 
b/integration-tx-api/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser
index a11f737b99..402030fc8d 100644
--- 
a/integration-tx-api/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser
+++ 
b/integration-tx-api/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser
@@ -14,4 +14,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-org.apache.seata.integration.tx.api.interceptor.parser.GlobalTransactionalInterceptorParser
\ No newline at end of file
+org.apache.seata.integration.tx.api.interceptor.parser.GlobalTransactionalInterceptorParser
+org.apache.seata.integration.tx.api.interceptor.parser.CombineTransactionalInterceptorParser
\ No newline at end of file
diff --git 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
 
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/BusinessCombineImpl.java
similarity index 56%
copy from 
integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
copy to 
integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/BusinessCombineImpl.java
index d9ac494c4c..2a4166a509 100644
--- 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
+++ 
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/BusinessCombineImpl.java
@@ -14,25 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seata.integration.tx.api.interceptor;
+package org.apache.seata.integration.tx.api.interceptor.parser;
+
+import org.apache.seata.spring.annotation.CombineTransactional;
+import org.apache.seata.spring.annotation.GlobalTransactional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * The  InvocationHandlerType enum
+ * The type Business.
  */
-public enum InvocationHandlerType {
-
-    /**
-     * GlobalTransactional InvocationHandler
-     */
-    GlobalTransactional,
-
-    /**
-     * TwoPhase InvocationHandler
-     */
-    TwoPhaseAnnotation,
+@GlobalTransactional(timeoutMills = 300000, name = "busi-doBiz")
+@CombineTransactional
+public class BusinessCombineImpl implements Business {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BusinessCombineImpl.class);
 
-    /**
-     * SagaAnnotation InvocationHandler
-     */
-    SagaAnnotation
+    @Override
+    public String doBiz(String msg) {
+        LOGGER.info("Business doBiz");
+        return "hello " + msg;
+    }
 }
diff --git 
a/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParserTest.java
 
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParserTest.java
new file mode 100644
index 0000000000..a7a9e88b74
--- /dev/null
+++ 
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParserTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.tx.api.interceptor.parser;
+
+import 
org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler;
+import 
org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CombineTransactionalInterceptorParserTest {
+
+    @Test
+    void parserInterfaceToProxy() throws Exception {
+
+        // given
+        BusinessCombineImpl business = new BusinessCombineImpl();
+
+        GlobalTransactionalInterceptorParser 
globalTransactionalInterceptorParser =
+                new GlobalTransactionalInterceptorParser();
+
+        // when
+        ProxyInvocationHandler proxyInvocationHandler = 
globalTransactionalInterceptorParser.parserInterfaceToProxy(
+                business, business.getClass().getName());
+
+        // then
+        Assertions.assertNotNull(proxyInvocationHandler);
+
+        // given
+        CombineTransactionalInterceptorParser 
combineTransactionalInterceptorParser =
+                new CombineTransactionalInterceptorParser();
+
+        // when
+        ProxyInvocationHandler proxyInvocationHandler2 = 
combineTransactionalInterceptorParser.parserInterfaceToProxy(
+                business, business.getClass().getName());
+
+        // then
+        Assertions.assertNotNull(proxyInvocationHandler2);
+    }
+
+    @Test
+    void parserInterfaceToProxyGlobalTransactionalIsFirst() throws Exception {
+        // given
+        BusinessCombineImpl business = new BusinessCombineImpl();
+
+        // when
+        ProxyInvocationHandler proxyInvocationHandler = 
DefaultInterfaceParser.get()
+                .parserInterfaceToProxy(business, 
business.getClass().getName());
+
+        // then
+        assertThat(proxyInvocationHandler)
+                .as("Proxy order verification")
+                .isNotNull()
+                .isInstanceOf(GlobalTransactionalInterceptorHandler.class);
+    }
+}
diff --git 
a/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/ProxyUtilsGlobalTransactionalTest.java
 
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/ProxyUtilsGlobalTransactionalTest.java
index 66d78409e4..a18bd27c8b 100644
--- 
a/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/ProxyUtilsGlobalTransactionalTest.java
+++ 
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/ProxyUtilsGlobalTransactionalTest.java
@@ -81,4 +81,15 @@ public class ProxyUtilsGlobalTransactionalTest {
 
         Assertions.assertNotNull(result);
     }
+
+    @Test
+    public void testXAWithCombineTransaction() {
+        BusinessCombineImpl businessCombine = new BusinessCombineImpl();
+
+        Business businessProxy = ProxyUtil.createProxy(businessCombine);
+
+        String result = businessProxy.doBiz("test");
+
+        Assertions.assertNotNull(result);
+    }
 }
diff --git 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
new file mode 100644
index 0000000000..3bd6ab1fc8
--- /dev/null
+++ 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.combine;
+
+import org.apache.seata.core.context.RootContext;
+import org.apache.seata.rm.datasource.xa.ConnectionProxyXA;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CombineConnectionHolder {
+    private static final ThreadLocal<Map<String, Map<Object, 
ConnectionProxyXA>>> CONNECTION_HOLDER =
+            ThreadLocal.withInitial(ConcurrentHashMap::new);
+
+    public static ConnectionProxyXA get(DataSource dataSource) {
+        Map<Object, ConnectionProxyXA> connMap = 
CONNECTION_HOLDER.get().get(RootContext.getXID());
+        if (connMap != null) {
+            return connMap.get(dataSource);
+        }
+        return null;
+    }
+
+    public static Collection<ConnectionProxyXA> getDsConn() {
+        Map<Object, ConnectionProxyXA> connectionMap = 
CONNECTION_HOLDER.get().get(RootContext.getXID());
+        return connectionMap != null ? connectionMap.values() : 
Collections.emptyList();
+    }
+
+    public static void putConnection(DataSource dataSource, ConnectionProxyXA 
connection) throws SQLException {
+        Map<String, Map<Object, ConnectionProxyXA>> concurrentHashMap = 
CONNECTION_HOLDER.get();
+        String xid = RootContext.getXID();
+        Map<Object, ConnectionProxyXA> connectionProxyMap =
+                concurrentHashMap.computeIfAbsent(xid, k -> new 
ConcurrentHashMap<>());
+
+        if (connectionProxyMap.putIfAbsent(dataSource, connection) == null) {
+            connection.setAutoCommit(false);
+            connection.setCombine(true);
+        }
+    }
+
+    public static void clear() {
+        CONNECTION_HOLDER.get().remove(RootContext.getXID());
+    }
+}
diff --git 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
index 6ca2ec3186..ff7f5b3294 100644
--- 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
+++ 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
@@ -73,6 +73,8 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
 
     private final ResourceLock resourceLock = new ResourceLock();
 
+    private volatile boolean combine = false;
+
     /**
      * Constructor of Connection Proxy for XA mode.
      *
@@ -161,6 +163,7 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
      * @throws XAException XAException
      */
     public void xaRollback(XAXid xaXid) throws XAException {
+        xaEnd(xaXid, XAResource.TMFAIL);
         xaResource.rollback(xaXid);
         releaseIfNecessary();
     }
@@ -227,6 +230,9 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
     @Override
     public void commit() throws SQLException {
         try (ResourceLock ignored = resourceLock.obtain()) {
+            if (combine) {
+                return;
+            }
             if (currentAutoCommitStatus || isReadOnly()) {
                 // Ignore the committing on an autocommit session and 
read-only transaction.
                 return;
@@ -239,6 +245,9 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
 
     @Override
     public void rollback() throws SQLException {
+        if (combine) {
+            return;
+        }
         if (currentAutoCommitStatus || isReadOnly()) {
             // Ignore the committing on an autocommit session and read-only 
transaction.
             return;
@@ -300,6 +309,7 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
         if (!isHeld()) {
             xaBranchXid = null;
         }
+        combine = false;
     }
 
     private void checkTimeout(Long now) throws XAException {
@@ -312,6 +322,9 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
     @Override
     public void close() throws SQLException {
         try (ResourceLock ignored = resourceLock.obtain()) {
+            if (combine) {
+                return;
+            }
             try {
                 if (xaActive && this.xaBranchXid != null) {
                     // XA End: Success
@@ -435,4 +448,8 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
     public ResourceLock getResourceLock() {
         return resourceLock;
     }
+
+    public void setCombine(boolean combine) {
+        this.combine = combine;
+    }
 }
diff --git 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java
 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java
index 8bef4bea15..38c989359c 100644
--- 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java
+++ 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java
@@ -22,6 +22,7 @@ import org.apache.seata.core.model.BranchType;
 import org.apache.seata.core.protocol.Version;
 import org.apache.seata.rm.DefaultResourceManager;
 import org.apache.seata.rm.datasource.SeataDataSourceProxy;
+import org.apache.seata.rm.datasource.combine.CombineConnectionHolder;
 import org.apache.seata.rm.datasource.util.JdbcUtils;
 import org.apache.seata.rm.datasource.util.XAUtils;
 import org.slf4j.Logger;
@@ -87,12 +88,24 @@ public class DataSourceProxyXA extends 
AbstractDataSourceProxyXA {
 
     @Override
     public Connection getConnection() throws SQLException {
+        if (RootContext.inGlobalTransaction() && 
RootContext.inCombineTransaction()) {
+            ConnectionProxyXA connectionProxyXA = 
CombineConnectionHolder.get(this.dataSource);
+            if (connectionProxyXA != null && !connectionProxyXA.isClosed()) {
+                return connectionProxyXA;
+            }
+        }
         Connection connection = dataSource.getConnection();
         return getConnectionProxy(connection);
     }
 
     @Override
     public Connection getConnection(String username, String password) throws 
SQLException {
+        if (RootContext.inGlobalTransaction() && 
RootContext.inCombineTransaction()) {
+            ConnectionProxyXA connectionProxyXA = 
CombineConnectionHolder.get(this.dataSource);
+            if (connectionProxyXA != null && !connectionProxyXA.isClosed()) {
+                return connectionProxyXA;
+            }
+        }
         Connection connection = dataSource.getConnection(username, password);
         return getConnectionProxy(connection);
     }
@@ -101,7 +114,11 @@ public class DataSourceProxyXA extends 
AbstractDataSourceProxyXA {
         if (!RootContext.inGlobalTransaction()) {
             return connection;
         }
-        return getConnectionProxyXA(connection);
+        ConnectionProxyXA connectionProxyXA = (ConnectionProxyXA) 
getConnectionProxyXA(connection);
+        if (RootContext.inCombineTransaction()) {
+            CombineConnectionHolder.putConnection(this.dataSource, 
connectionProxyXA);
+        }
+        return connectionProxyXA;
     }
 
     @Override
diff --git 
a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolderTest.java
 
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolderTest.java
new file mode 100644
index 0000000000..d8b04ad9c3
--- /dev/null
+++ 
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolderTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.combine;
+
+import org.apache.seata.core.context.RootContext;
+import org.apache.seata.rm.datasource.xa.ConnectionProxyXA;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.verify;
+
+public class CombineConnectionHolderTest {
+
+    @Mock
+    private DataSource dataSource;
+
+    @Mock
+    private ConnectionProxyXA connectionProxyXA;
+
+    private AutoCloseable closeable;
+    private MockedStatic<RootContext> mockedRootContext;
+
+    @BeforeEach
+    public void init() {
+        closeable = MockitoAnnotations.openMocks(this);
+        mockedRootContext = Mockito.mockStatic(RootContext.class);
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        closeable.close();
+        mockedRootContext.close();
+    }
+
+    @Test
+    public void testGet() throws SQLException {
+        String xid = "test-xid";
+        mockedRootContext.when(RootContext::getXID).thenReturn(xid);
+
+        assertNull(CombineConnectionHolder.get(dataSource));
+
+        RootContext.bind(xid);
+        CombineConnectionHolder.putConnection(dataSource, connectionProxyXA);
+
+        assertSame(connectionProxyXA, CombineConnectionHolder.get(dataSource));
+        CombineConnectionHolder.clear();
+    }
+
+    @Test
+    public void testPutConnection() throws SQLException {
+        String xid = "test-xid";
+        mockedRootContext.when(RootContext::getXID).thenReturn(xid);
+        RootContext.bind(xid);
+        CombineConnectionHolder.putConnection(dataSource, connectionProxyXA);
+
+        ConnectionProxyXA getConnectionProxyXA = 
CombineConnectionHolder.get(dataSource);
+        assertNotNull(getConnectionProxyXA);
+        assertSame(connectionProxyXA, getConnectionProxyXA);
+
+        verify(connectionProxyXA).setAutoCommit(false);
+        verify(connectionProxyXA).setCombine(true);
+        CombineConnectionHolder.clear();
+    }
+
+    @Test
+    public void testGetDsConn() throws SQLException {
+        String xid = "test-xid";
+        mockedRootContext.when(RootContext::getXID).thenReturn(xid);
+
+        Collection<ConnectionProxyXA> connections = 
CombineConnectionHolder.getDsConn();
+        assertTrue(connections.isEmpty());
+
+        RootContext.bind(xid);
+        CombineConnectionHolder.putConnection(dataSource, connectionProxyXA);
+
+        connections = CombineConnectionHolder.getDsConn();
+        assertEquals(1, connections.size());
+        assertSame(connectionProxyXA, connections.iterator().next());
+        CombineConnectionHolder.clear();
+    }
+
+    @Test
+    public void testClear() throws SQLException {
+        String xid = "test-xid";
+        mockedRootContext.when(RootContext::getXID).thenReturn(xid);
+
+        RootContext.bind(xid);
+        CombineConnectionHolder.putConnection(dataSource, connectionProxyXA);
+
+        CombineConnectionHolder.clear();
+
+        assertNull(CombineConnectionHolder.get(dataSource));
+    }
+}
diff --git 
a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXATest.java
 
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXATest.java
index b1121ed806..457472d615 100644
--- 
a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXATest.java
+++ 
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXATest.java
@@ -21,11 +21,14 @@ import com.kingbase8.xa.KBXAConnection;
 import com.mysql.jdbc.JDBC4MySQLConnection;
 import com.mysql.jdbc.jdbc2.optional.JDBC4ConnectionWrapper;
 import org.apache.seata.core.context.RootContext;
+import org.apache.seata.rm.datasource.combine.CombineConnectionHolder;
 import org.apache.seata.rm.datasource.mock.MockDataSource;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
 import javax.sql.DataSource;
@@ -37,11 +40,20 @@ import java.sql.Driver;
 import java.sql.SQLException;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for DataSourceProxyXA
  */
 public class DataSourceProxyXATest {
+    @BeforeEach
+    public void setUp() {
+        // Clean up context before each test
+        RootContext.unbind();
+        RootContext.unbindBranchType();
+        RootContext.unbindCombineTransaction();
+    }
 
     @Test
     public void test_constructor() {
@@ -57,10 +69,10 @@ public class DataSourceProxyXATest {
     @Test
     public void testGetConnection() throws SQLException {
         // Mock
-        Driver driver = Mockito.mock(Driver.class);
-        JDBC4MySQLConnection connection = 
Mockito.mock(JDBC4MySQLConnection.class);
+        Driver driver = mock(Driver.class);
+        JDBC4MySQLConnection connection = mock(JDBC4MySQLConnection.class);
         Mockito.when(connection.getAutoCommit()).thenReturn(true);
-        DatabaseMetaData metaData = Mockito.mock(DatabaseMetaData.class);
+        DatabaseMetaData metaData = mock(DatabaseMetaData.class);
         Mockito.when(metaData.getURL()).thenReturn("jdbc:mysql:xxx");
         Mockito.when(connection.getMetaData()).thenReturn(metaData);
         Mockito.when(driver.connect(any(), any())).thenReturn(connection);
@@ -91,11 +103,11 @@ public class DataSourceProxyXATest {
     @Test
     public void testGetMariaXaConnection() throws SQLException, 
ClassNotFoundException {
         // Mock
-        Driver driver = Mockito.mock(Driver.class);
+        Driver driver = mock(Driver.class);
         Class clazz = Class.forName("org.mariadb.jdbc.MariaDbConnection");
-        Connection connection = (Connection) (Mockito.mock(clazz));
+        Connection connection = (Connection) (mock(clazz));
         Mockito.when(connection.getAutoCommit()).thenReturn(true);
-        DatabaseMetaData metaData = Mockito.mock(DatabaseMetaData.class);
+        DatabaseMetaData metaData = mock(DatabaseMetaData.class);
         Mockito.when(metaData.getURL()).thenReturn("jdbc:mariadb:xxx");
         Mockito.when(connection.getMetaData()).thenReturn(metaData);
         Mockito.when(driver.connect(any(), any())).thenReturn(connection);
@@ -131,11 +143,11 @@ public class DataSourceProxyXATest {
             disabledReason = "druid 1.2.8 correct support kingbase")
     public void testGetKingbaseXaConnection() throws SQLException, 
ClassNotFoundException {
         // Mock
-        Driver driver = Mockito.mock(Driver.class);
+        Driver driver = mock(Driver.class);
         Class clazz = Class.forName("com.kingbase8.jdbc.KbConnection");
-        Connection connection = (Connection) (Mockito.mock(clazz));
+        Connection connection = (Connection) (mock(clazz));
         Mockito.when(connection.getAutoCommit()).thenReturn(true);
-        DatabaseMetaData metaData = Mockito.mock(DatabaseMetaData.class);
+        DatabaseMetaData metaData = mock(DatabaseMetaData.class);
         Mockito.when(metaData.getURL()).thenReturn("jdbc:kingbase8:xxx");
         Mockito.when(connection.getMetaData()).thenReturn(metaData);
         Mockito.when(driver.connect(any(), any())).thenReturn(connection);
@@ -162,6 +174,38 @@ public class DataSourceProxyXATest {
         tearDown();
     }
 
+    @Test
+    public void testGetConnectionInCombineMode() throws SQLException {
+        RootContext.bind("testXID");
+        RootContext.bindCombineTransaction();
+
+        ConnectionProxyXA combineConn = mock(ConnectionProxyXA.class);
+        when(combineConn.isClosed()).thenReturn(false);
+
+        try (MockedStatic<CombineConnectionHolder> holderMock = 
Mockito.mockStatic(CombineConnectionHolder.class)) {
+            holderMock
+                    .when(() -> 
CombineConnectionHolder.get(any(DataSource.class)))
+                    .thenReturn(combineConn);
+            Driver driver = mock(Driver.class);
+            JDBC4MySQLConnection connection = mock(JDBC4MySQLConnection.class);
+            Mockito.when(connection.getAutoCommit()).thenReturn(true);
+            DatabaseMetaData metaData = mock(DatabaseMetaData.class);
+            Mockito.when(metaData.getURL()).thenReturn("jdbc:mysql:xxx");
+            Mockito.when(connection.getMetaData()).thenReturn(metaData);
+            Mockito.when(driver.connect(any(), any())).thenReturn(connection);
+
+            DruidDataSource realDataSource = new DruidDataSource();
+            realDataSource.setDriver(driver);
+            DataSourceProxyXA proxyDataSource = new 
DataSourceProxyXA(realDataSource);
+
+            Connection result = proxyDataSource.getConnection();
+
+            Assertions.assertEquals(combineConn, result);
+
+            holderMock.verify(() -> 
CombineConnectionHolder.get(realDataSource));
+        }
+    }
+
     @AfterAll
     public static void tearDown() {
         RootContext.unbind();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to