This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new a9e19b78e7 feature: Reuse connection to merge branch transactions
(#7509)
a9e19b78e7 is described below
commit a9e19b78e7fe8ebf081963dcd3ed958593e2146d
Author: PeppaO <[email protected]>
AuthorDate: Wed Aug 13 13:38:30 2025 +0800
feature: Reuse connection to merge branch transactions (#7509)
---
changes/en-us/2.x.md | 1 +
changes/zh-cn/2.x.md | 1 +
.../org/apache/seata/core/context/RootContext.java | 18 +++
integration-tx-api/pom.xml | 1 -
.../tx/api/interceptor/InvocationHandlerType.java | 7 +-
.../CombineTransactionalInterceptorHandler.java | 135 +++++++++++++++++++++
.../CombineTransactionalInterceptorParser.java | 91 ++++++++++++++
.../annotation/CombineTransactional.java} | 32 ++---
...ation.tx.api.interceptor.parser.InterfaceParser | 3 +-
.../interceptor/parser/BusinessCombineImpl.java} | 33 +++--
.../CombineTransactionalInterceptorParserTest.java | 71 +++++++++++
.../parser/ProxyUtilsGlobalTransactionalTest.java | 11 ++
.../combine/CombineConnectionHolder.java | 61 ++++++++++
.../seata/rm/datasource/xa/ConnectionProxyXA.java | 17 +++
.../seata/rm/datasource/xa/DataSourceProxyXA.java | 19 ++-
.../combine/CombineConnectionHolderTest.java | 122 +++++++++++++++++++
.../rm/datasource/xa/DataSourceProxyXATest.java | 62 ++++++++--
17 files changed, 636 insertions(+), 49 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 99ea9240c8..52df620a7a 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -21,6 +21,7 @@ Add changes here for all PR submitted to the 2.x branch.
### feature:
- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] Add http
request filter for seata-server
+- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] Reuse
connection to merge branch transactions
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] upgrade HTTP
client in common module to support HTTP/2
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index f87de6e290..e53d7a531d 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -21,6 +21,7 @@
### feature:
- [[#7485](https://github.com/apache/incubator-seata/pull/7485)]
给seata-server端的http请求添加过滤器
+- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] 复用连接合并分支事务
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] 升级 common
模块中的 HTTP 客户端以支持 HTTP/2
diff --git a/core/src/main/java/org/apache/seata/core/context/RootContext.java
b/core/src/main/java/org/apache/seata/core/context/RootContext.java
index d8f68669ea..a34eaabbd3 100644
--- a/core/src/main/java/org/apache/seata/core/context/RootContext.java
+++ b/core/src/main/java/org/apache/seata/core/context/RootContext.java
@@ -91,6 +91,8 @@ public class RootContext {
private static BranchType DEFAULT_BRANCH_TYPE;
+ public static final String KEY_COMBINE_TRANSACTION_FLAG = "TX_COMBINE";
+
public static void setDefaultBranchType(BranchType defaultBranchType) {
if (defaultBranchType != AT && defaultBranchType != XA) {
throw new IllegalArgumentException("The default branch type must
be " + AT + " or " + XA + "."
@@ -207,6 +209,10 @@ public class RootContext {
return BranchType.SAGA == getBranchType();
}
+ public static boolean inXABranch() {
+ return BranchType.XA == getBranchType();
+ }
+
/**
* get the branch type
*
@@ -282,4 +288,16 @@ public class RootContext {
public static Map<String, Object> entries() {
return CONTEXT_HOLDER.entries();
}
+
+ public static boolean inCombineTransaction() {
+ return CONTEXT_HOLDER.get(KEY_COMBINE_TRANSACTION_FLAG) != null;
+ }
+
+ public static void bindCombineTransaction() {
+ CONTEXT_HOLDER.put(KEY_COMBINE_TRANSACTION_FLAG, true);
+ }
+
+ public static void unbindCombineTransaction() {
+ CONTEXT_HOLDER.remove(KEY_COMBINE_TRANSACTION_FLAG);
+ }
}
diff --git a/integration-tx-api/pom.xml b/integration-tx-api/pom.xml
index 624edb9fd5..69970790ae 100644
--- a/integration-tx-api/pom.xml
+++ b/integration-tx-api/pom.xml
@@ -61,7 +61,6 @@
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
</dependency>
-
</dependencies>
</project>
diff --git
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
index d9ac494c4c..2ca0646ef7 100644
---
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
+++
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
@@ -34,5 +34,10 @@ public enum InvocationHandlerType {
/**
* SagaAnnotation InvocationHandler
*/
- SagaAnnotation
+ SagaAnnotation,
+
+ /**
+ * CombineTransactional InvocationHandler
+ */
+ CombineTransactional
}
diff --git
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/CombineTransactionalInterceptorHandler.java
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/CombineTransactionalInterceptorHandler.java
new file mode 100644
index 0000000000..05f1381087
--- /dev/null
+++
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/CombineTransactionalInterceptorHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.tx.api.interceptor.handler;
+
+import org.apache.seata.core.context.RootContext;
+import org.apache.seata.integration.tx.api.interceptor.InvocationHandlerType;
+import org.apache.seata.integration.tx.api.interceptor.InvocationWrapper;
+import
org.apache.seata.integration.tx.api.interceptor.SeataInterceptorPosition;
+import org.apache.seata.integration.tx.api.util.ClassUtils;
+import org.apache.seata.rm.datasource.combine.CombineConnectionHolder;
+import org.apache.seata.rm.datasource.xa.ConnectionProxyXA;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * The type Combine transactional interceptor handler.
+ *
+ */
+public class CombineTransactionalInterceptorHandler extends
AbstractProxyInvocationHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CombineTransactionalInterceptorHandler.class);
+
+ private Set<String> methodsToProxy;
+
+ private static volatile ScheduledThreadPoolExecutor executor;
+
+ public CombineTransactionalInterceptorHandler(Set<String> methodsToProxy) {
+ this.methodsToProxy = methodsToProxy;
+ }
+
+ @Override
+ protected Object doInvoke(InvocationWrapper invocation) throws Throwable {
+ Class<?> targetClass = invocation.getTarget().getClass();
+ Method specificMethod =
ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
+ if (specificMethod != null &&
!specificMethod.getDeclaringClass().equals(Object.class)) {
+ return handleCombineTransactional(invocation);
+ }
+ return invocation.proceed();
+ }
+
+ private Object handleCombineTransactional(final InvocationWrapper
methodInvocation) throws Throwable {
+ if (!RootContext.inGlobalTransaction()) {
+ // not in transaction, or this interceptor is disabled
+ return methodInvocation.proceed();
+ }
+
+ RootContext.bindCombineTransaction();
+
+ try {
+ Object result = methodInvocation.proceed();
+
+ for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn())
{
+ conn.setCombine(false);
+ conn.commit();
+ }
+ return result;
+ } catch (Exception e) {
+ LOGGER.error(
+ String.format(
+ "@CombineTransactional failed to handle,xid: %s
occur exp msg: %s",
+ RootContext.getXID(), e.getMessage()),
+ e);
+ // doRollback
+ for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn())
{
+ conn.setCombine(false);
+ conn.rollback();
+ }
+ throw e;
+ } finally {
+ for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn())
{
+ try {
+ // Reset autocommit (if not autocommitting)
+ if (!conn.getAutoCommit()) {
+ conn.setAutoCommit(true);
+ }
+ } catch (Throwable t) {
+ // Record the exception of resetting the auto-commit, but
do not interrupt and continue to try to
+ // close
+ LOGGER.error("Failed to reset autoCommit to true for
connection: {}", conn, t);
+ }
+ try {
+ if (conn.isClosed()) {
+ LOGGER.warn("Connection is closed: {}", conn);
+ }
+ conn.close();
+ } catch (Throwable t) {
+ // Record the exception of closing the connection, but do
not interrupt the loop and continue to
+ // process the next connection
+ LOGGER.error("Failed to close connection: {}", conn, t);
+ }
+ }
+ // Clean up local cache connections
+ CombineConnectionHolder.clear();
+ RootContext.unbindCombineTransaction();
+ }
+ }
+
+ @Override
+ public Set<String> getMethodsToProxy() {
+ return methodsToProxy;
+ }
+
+ @Override
+ public SeataInterceptorPosition getPosition() {
+ return SeataInterceptorPosition.AfterTransaction;
+ }
+
+ @Override
+ public String type() {
+ return InvocationHandlerType.CombineTransactional.name();
+ }
+
+ @Override
+ public int order() {
+ return 1;
+ }
+}
diff --git
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParser.java
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParser.java
new file mode 100644
index 0000000000..622eee3cc6
--- /dev/null
+++
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParser.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.tx.api.interceptor.parser;
+
+import org.apache.seata.common.util.CollectionUtils;
+import org.apache.seata.common.util.ReflectionUtil;
+import
org.apache.seata.integration.tx.api.interceptor.handler.CombineTransactionalInterceptorHandler;
+import
org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
+import org.apache.seata.spring.annotation.CombineTransactional;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+public class CombineTransactionalInterceptorParser implements InterfaceParser {
+
+ protected final Set<String> methodsToProxy = new HashSet<>();
+
+ /**
+ * @param target
+ * @return
+ * @throws Exception
+ * @see CombineTransactional // Combine annotation
+ */
+ @Override
+ public ProxyInvocationHandler parserInterfaceToProxy(Object target, String
objectName) throws Exception {
+ Class<?> serviceInterface =
DefaultTargetClassParser.get().findTargetClass(target);
+ Class<?>[] interfacesIfJdk =
DefaultTargetClassParser.get().findInterfaces(target);
+
+ if (existsAnnotation(serviceInterface) ||
existsAnnotation(interfacesIfJdk)) {
+ return createProxyInvocationHandler();
+ }
+
+ return null;
+ }
+
+ protected ProxyInvocationHandler createProxyInvocationHandler() {
+ return new CombineTransactionalInterceptorHandler(methodsToProxy);
+ }
+
+ @Override
+ public IfNeedEnhanceBean parseIfNeedEnhancement(Class<?> beanClass) {
+ Set<Class<?>> interfaceClasses =
ReflectionUtil.getInterfaces(beanClass);
+ Class<?>[] interfaceClasseArray = interfaceClasses.toArray(new
Class<?>[0]);
+
+ IfNeedEnhanceBean ifNeedEnhanceBean = new IfNeedEnhanceBean();
+ if (existsAnnotation(beanClass) ||
existsAnnotation(interfaceClasseArray)) {
+ ifNeedEnhanceBean.setIfNeed(true);
+ ifNeedEnhanceBean.setNeedEnhanceEnum(NeedEnhanceEnum.SERVICE_BEAN);
+ }
+ return ifNeedEnhanceBean;
+ }
+
+ protected boolean existsAnnotation(Class<?>... classes) {
+ boolean result = false;
+ if (CollectionUtils.isNotEmpty(classes)) {
+ for (Class<?> clazz : classes) {
+ if (clazz == null) {
+ continue;
+ }
+ CombineTransactional trxAnno =
clazz.getAnnotation(CombineTransactional.class);
+ if (trxAnno != null) {
+ return true;
+ }
+ Method[] methods = clazz.getMethods();
+ for (Method method : methods) {
+ trxAnno = method.getAnnotation(CombineTransactional.class);
+ if (trxAnno != null) {
+ methodsToProxy.add(method.getName());
+ result = true;
+ }
+ }
+ }
+ }
+ return result;
+ }
+}
diff --git
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
b/integration-tx-api/src/main/java/org/apache/seata/spring/annotation/CombineTransactional.java
similarity index 63%
copy from
integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
copy to
integration-tx-api/src/main/java/org/apache/seata/spring/annotation/CombineTransactional.java
index d9ac494c4c..bda148d401 100644
---
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
+++
b/integration-tx-api/src/main/java/org/apache/seata/spring/annotation/CombineTransactional.java
@@ -14,25 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seata.integration.tx.api.interceptor;
+package org.apache.seata.spring.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
/**
- * The InvocationHandlerType enum
+ * The interface Combine transactional.
+ * This annotation is only valid in XA mode.
*/
-public enum InvocationHandlerType {
-
- /**
- * GlobalTransactional InvocationHandler
- */
- GlobalTransactional,
-
- /**
- * TwoPhase InvocationHandler
- */
- TwoPhaseAnnotation,
-
- /**
- * SagaAnnotation InvocationHandler
- */
- SagaAnnotation
-}
+@Target({ElementType.TYPE, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface CombineTransactional {}
diff --git
a/integration-tx-api/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser
b/integration-tx-api/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser
index a11f737b99..402030fc8d 100644
---
a/integration-tx-api/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser
+++
b/integration-tx-api/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser
@@ -14,4 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-org.apache.seata.integration.tx.api.interceptor.parser.GlobalTransactionalInterceptorParser
\ No newline at end of file
+org.apache.seata.integration.tx.api.interceptor.parser.GlobalTransactionalInterceptorParser
+org.apache.seata.integration.tx.api.interceptor.parser.CombineTransactionalInterceptorParser
\ No newline at end of file
diff --git
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/BusinessCombineImpl.java
similarity index 56%
copy from
integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
copy to
integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/BusinessCombineImpl.java
index d9ac494c4c..2a4166a509 100644
---
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java
+++
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/BusinessCombineImpl.java
@@ -14,25 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seata.integration.tx.api.interceptor;
+package org.apache.seata.integration.tx.api.interceptor.parser;
+
+import org.apache.seata.spring.annotation.CombineTransactional;
+import org.apache.seata.spring.annotation.GlobalTransactional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The InvocationHandlerType enum
+ * The type Business.
*/
-public enum InvocationHandlerType {
-
- /**
- * GlobalTransactional InvocationHandler
- */
- GlobalTransactional,
-
- /**
- * TwoPhase InvocationHandler
- */
- TwoPhaseAnnotation,
+@GlobalTransactional(timeoutMills = 300000, name = "busi-doBiz")
+@CombineTransactional
+public class BusinessCombineImpl implements Business {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BusinessCombineImpl.class);
- /**
- * SagaAnnotation InvocationHandler
- */
- SagaAnnotation
+ @Override
+ public String doBiz(String msg) {
+ LOGGER.info("Business doBiz");
+ return "hello " + msg;
+ }
}
diff --git
a/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParserTest.java
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParserTest.java
new file mode 100644
index 0000000000..a7a9e88b74
--- /dev/null
+++
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/CombineTransactionalInterceptorParserTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.tx.api.interceptor.parser;
+
+import
org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler;
+import
org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CombineTransactionalInterceptorParserTest {
+
+ @Test
+ void parserInterfaceToProxy() throws Exception {
+
+ // given
+ BusinessCombineImpl business = new BusinessCombineImpl();
+
+ GlobalTransactionalInterceptorParser
globalTransactionalInterceptorParser =
+ new GlobalTransactionalInterceptorParser();
+
+ // when
+ ProxyInvocationHandler proxyInvocationHandler =
globalTransactionalInterceptorParser.parserInterfaceToProxy(
+ business, business.getClass().getName());
+
+ // then
+ Assertions.assertNotNull(proxyInvocationHandler);
+
+ // given
+ CombineTransactionalInterceptorParser
combineTransactionalInterceptorParser =
+ new CombineTransactionalInterceptorParser();
+
+ // when
+ ProxyInvocationHandler proxyInvocationHandler2 =
combineTransactionalInterceptorParser.parserInterfaceToProxy(
+ business, business.getClass().getName());
+
+ // then
+ Assertions.assertNotNull(proxyInvocationHandler2);
+ }
+
+ @Test
+ void parserInterfaceToProxyGlobalTransactionalIsFirst() throws Exception {
+ // given
+ BusinessCombineImpl business = new BusinessCombineImpl();
+
+ // when
+ ProxyInvocationHandler proxyInvocationHandler =
DefaultInterfaceParser.get()
+ .parserInterfaceToProxy(business,
business.getClass().getName());
+
+ // then
+ assertThat(proxyInvocationHandler)
+ .as("Proxy order verification")
+ .isNotNull()
+ .isInstanceOf(GlobalTransactionalInterceptorHandler.class);
+ }
+}
diff --git
a/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/ProxyUtilsGlobalTransactionalTest.java
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/ProxyUtilsGlobalTransactionalTest.java
index 66d78409e4..a18bd27c8b 100644
---
a/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/ProxyUtilsGlobalTransactionalTest.java
+++
b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/interceptor/parser/ProxyUtilsGlobalTransactionalTest.java
@@ -81,4 +81,15 @@ public class ProxyUtilsGlobalTransactionalTest {
Assertions.assertNotNull(result);
}
+
+ @Test
+ public void testXAWithCombineTransaction() {
+ BusinessCombineImpl businessCombine = new BusinessCombineImpl();
+
+ Business businessProxy = ProxyUtil.createProxy(businessCombine);
+
+ String result = businessProxy.doBiz("test");
+
+ Assertions.assertNotNull(result);
+ }
}
diff --git
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
new file mode 100644
index 0000000000..3bd6ab1fc8
--- /dev/null
+++
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.combine;
+
+import org.apache.seata.core.context.RootContext;
+import org.apache.seata.rm.datasource.xa.ConnectionProxyXA;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CombineConnectionHolder {
+ private static final ThreadLocal<Map<String, Map<Object,
ConnectionProxyXA>>> CONNECTION_HOLDER =
+ ThreadLocal.withInitial(ConcurrentHashMap::new);
+
+ public static ConnectionProxyXA get(DataSource dataSource) {
+ Map<Object, ConnectionProxyXA> connMap =
CONNECTION_HOLDER.get().get(RootContext.getXID());
+ if (connMap != null) {
+ return connMap.get(dataSource);
+ }
+ return null;
+ }
+
+ public static Collection<ConnectionProxyXA> getDsConn() {
+ Map<Object, ConnectionProxyXA> connectionMap =
CONNECTION_HOLDER.get().get(RootContext.getXID());
+ return connectionMap != null ? connectionMap.values() :
Collections.emptyList();
+ }
+
+ public static void putConnection(DataSource dataSource, ConnectionProxyXA
connection) throws SQLException {
+ Map<String, Map<Object, ConnectionProxyXA>> concurrentHashMap =
CONNECTION_HOLDER.get();
+ String xid = RootContext.getXID();
+ Map<Object, ConnectionProxyXA> connectionProxyMap =
+ concurrentHashMap.computeIfAbsent(xid, k -> new
ConcurrentHashMap<>());
+
+ if (connectionProxyMap.putIfAbsent(dataSource, connection) == null) {
+ connection.setAutoCommit(false);
+ connection.setCombine(true);
+ }
+ }
+
+ public static void clear() {
+ CONNECTION_HOLDER.get().remove(RootContext.getXID());
+ }
+}
diff --git
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
index 6ca2ec3186..ff7f5b3294 100644
---
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
+++
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
@@ -73,6 +73,8 @@ public class ConnectionProxyXA extends
AbstractConnectionProxyXA implements Hold
private final ResourceLock resourceLock = new ResourceLock();
+ private volatile boolean combine = false;
+
/**
* Constructor of Connection Proxy for XA mode.
*
@@ -161,6 +163,7 @@ public class ConnectionProxyXA extends
AbstractConnectionProxyXA implements Hold
* @throws XAException XAException
*/
public void xaRollback(XAXid xaXid) throws XAException {
+ xaEnd(xaXid, XAResource.TMFAIL);
xaResource.rollback(xaXid);
releaseIfNecessary();
}
@@ -227,6 +230,9 @@ public class ConnectionProxyXA extends
AbstractConnectionProxyXA implements Hold
@Override
public void commit() throws SQLException {
try (ResourceLock ignored = resourceLock.obtain()) {
+ if (combine) {
+ return;
+ }
if (currentAutoCommitStatus || isReadOnly()) {
// Ignore the committing on an autocommit session and
read-only transaction.
return;
@@ -239,6 +245,9 @@ public class ConnectionProxyXA extends
AbstractConnectionProxyXA implements Hold
@Override
public void rollback() throws SQLException {
+ if (combine) {
+ return;
+ }
if (currentAutoCommitStatus || isReadOnly()) {
// Ignore the committing on an autocommit session and read-only
transaction.
return;
@@ -300,6 +309,7 @@ public class ConnectionProxyXA extends
AbstractConnectionProxyXA implements Hold
if (!isHeld()) {
xaBranchXid = null;
}
+ combine = false;
}
private void checkTimeout(Long now) throws XAException {
@@ -312,6 +322,9 @@ public class ConnectionProxyXA extends
AbstractConnectionProxyXA implements Hold
@Override
public void close() throws SQLException {
try (ResourceLock ignored = resourceLock.obtain()) {
+ if (combine) {
+ return;
+ }
try {
if (xaActive && this.xaBranchXid != null) {
// XA End: Success
@@ -435,4 +448,8 @@ public class ConnectionProxyXA extends
AbstractConnectionProxyXA implements Hold
public ResourceLock getResourceLock() {
return resourceLock;
}
+
+ public void setCombine(boolean combine) {
+ this.combine = combine;
+ }
}
diff --git
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java
index 8bef4bea15..38c989359c 100644
---
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java
+++
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXA.java
@@ -22,6 +22,7 @@ import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.datasource.SeataDataSourceProxy;
+import org.apache.seata.rm.datasource.combine.CombineConnectionHolder;
import org.apache.seata.rm.datasource.util.JdbcUtils;
import org.apache.seata.rm.datasource.util.XAUtils;
import org.slf4j.Logger;
@@ -87,12 +88,24 @@ public class DataSourceProxyXA extends
AbstractDataSourceProxyXA {
@Override
public Connection getConnection() throws SQLException {
+ if (RootContext.inGlobalTransaction() &&
RootContext.inCombineTransaction()) {
+ ConnectionProxyXA connectionProxyXA =
CombineConnectionHolder.get(this.dataSource);
+ if (connectionProxyXA != null && !connectionProxyXA.isClosed()) {
+ return connectionProxyXA;
+ }
+ }
Connection connection = dataSource.getConnection();
return getConnectionProxy(connection);
}
@Override
public Connection getConnection(String username, String password) throws
SQLException {
+ if (RootContext.inGlobalTransaction() &&
RootContext.inCombineTransaction()) {
+ ConnectionProxyXA connectionProxyXA =
CombineConnectionHolder.get(this.dataSource);
+ if (connectionProxyXA != null && !connectionProxyXA.isClosed()) {
+ return connectionProxyXA;
+ }
+ }
Connection connection = dataSource.getConnection(username, password);
return getConnectionProxy(connection);
}
@@ -101,7 +114,11 @@ public class DataSourceProxyXA extends
AbstractDataSourceProxyXA {
if (!RootContext.inGlobalTransaction()) {
return connection;
}
- return getConnectionProxyXA(connection);
+ ConnectionProxyXA connectionProxyXA = (ConnectionProxyXA)
getConnectionProxyXA(connection);
+ if (RootContext.inCombineTransaction()) {
+ CombineConnectionHolder.putConnection(this.dataSource,
connectionProxyXA);
+ }
+ return connectionProxyXA;
}
@Override
diff --git
a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolderTest.java
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolderTest.java
new file mode 100644
index 0000000000..d8b04ad9c3
--- /dev/null
+++
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolderTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.combine;
+
+import org.apache.seata.core.context.RootContext;
+import org.apache.seata.rm.datasource.xa.ConnectionProxyXA;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.verify;
+
+public class CombineConnectionHolderTest {
+
+ @Mock
+ private DataSource dataSource;
+
+ @Mock
+ private ConnectionProxyXA connectionProxyXA;
+
+ private AutoCloseable closeable;
+ private MockedStatic<RootContext> mockedRootContext;
+
+ @BeforeEach
+ public void init() {
+ closeable = MockitoAnnotations.openMocks(this);
+ mockedRootContext = Mockito.mockStatic(RootContext.class);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ closeable.close();
+ mockedRootContext.close();
+ }
+
+ @Test
+ public void testGet() throws SQLException {
+ String xid = "test-xid";
+ mockedRootContext.when(RootContext::getXID).thenReturn(xid);
+
+ assertNull(CombineConnectionHolder.get(dataSource));
+
+ RootContext.bind(xid);
+ CombineConnectionHolder.putConnection(dataSource, connectionProxyXA);
+
+ assertSame(connectionProxyXA, CombineConnectionHolder.get(dataSource));
+ CombineConnectionHolder.clear();
+ }
+
+ @Test
+ public void testPutConnection() throws SQLException {
+ String xid = "test-xid";
+ mockedRootContext.when(RootContext::getXID).thenReturn(xid);
+ RootContext.bind(xid);
+ CombineConnectionHolder.putConnection(dataSource, connectionProxyXA);
+
+ ConnectionProxyXA getConnectionProxyXA =
CombineConnectionHolder.get(dataSource);
+ assertNotNull(getConnectionProxyXA);
+ assertSame(connectionProxyXA, getConnectionProxyXA);
+
+ verify(connectionProxyXA).setAutoCommit(false);
+ verify(connectionProxyXA).setCombine(true);
+ CombineConnectionHolder.clear();
+ }
+
+ @Test
+ public void testGetDsConn() throws SQLException {
+ String xid = "test-xid";
+ mockedRootContext.when(RootContext::getXID).thenReturn(xid);
+
+ Collection<ConnectionProxyXA> connections =
CombineConnectionHolder.getDsConn();
+ assertTrue(connections.isEmpty());
+
+ RootContext.bind(xid);
+ CombineConnectionHolder.putConnection(dataSource, connectionProxyXA);
+
+ connections = CombineConnectionHolder.getDsConn();
+ assertEquals(1, connections.size());
+ assertSame(connectionProxyXA, connections.iterator().next());
+ CombineConnectionHolder.clear();
+ }
+
+ @Test
+ public void testClear() throws SQLException {
+ String xid = "test-xid";
+ mockedRootContext.when(RootContext::getXID).thenReturn(xid);
+
+ RootContext.bind(xid);
+ CombineConnectionHolder.putConnection(dataSource, connectionProxyXA);
+
+ CombineConnectionHolder.clear();
+
+ assertNull(CombineConnectionHolder.get(dataSource));
+ }
+}
diff --git
a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXATest.java
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXATest.java
index b1121ed806..457472d615 100644
---
a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXATest.java
+++
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/xa/DataSourceProxyXATest.java
@@ -21,11 +21,14 @@ import com.kingbase8.xa.KBXAConnection;
import com.mysql.jdbc.JDBC4MySQLConnection;
import com.mysql.jdbc.jdbc2.optional.JDBC4ConnectionWrapper;
import org.apache.seata.core.context.RootContext;
+import org.apache.seata.rm.datasource.combine.CombineConnectionHolder;
import org.apache.seata.rm.datasource.mock.MockDataSource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import javax.sql.DataSource;
@@ -37,11 +40,20 @@ import java.sql.Driver;
import java.sql.SQLException;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Tests for DataSourceProxyXA
*/
public class DataSourceProxyXATest {
+ @BeforeEach
+ public void setUp() {
+ // Clean up context before each test
+ RootContext.unbind();
+ RootContext.unbindBranchType();
+ RootContext.unbindCombineTransaction();
+ }
@Test
public void test_constructor() {
@@ -57,10 +69,10 @@ public class DataSourceProxyXATest {
@Test
public void testGetConnection() throws SQLException {
// Mock
- Driver driver = Mockito.mock(Driver.class);
- JDBC4MySQLConnection connection =
Mockito.mock(JDBC4MySQLConnection.class);
+ Driver driver = mock(Driver.class);
+ JDBC4MySQLConnection connection = mock(JDBC4MySQLConnection.class);
Mockito.when(connection.getAutoCommit()).thenReturn(true);
- DatabaseMetaData metaData = Mockito.mock(DatabaseMetaData.class);
+ DatabaseMetaData metaData = mock(DatabaseMetaData.class);
Mockito.when(metaData.getURL()).thenReturn("jdbc:mysql:xxx");
Mockito.when(connection.getMetaData()).thenReturn(metaData);
Mockito.when(driver.connect(any(), any())).thenReturn(connection);
@@ -91,11 +103,11 @@ public class DataSourceProxyXATest {
@Test
public void testGetMariaXaConnection() throws SQLException,
ClassNotFoundException {
// Mock
- Driver driver = Mockito.mock(Driver.class);
+ Driver driver = mock(Driver.class);
Class clazz = Class.forName("org.mariadb.jdbc.MariaDbConnection");
- Connection connection = (Connection) (Mockito.mock(clazz));
+ Connection connection = (Connection) (mock(clazz));
Mockito.when(connection.getAutoCommit()).thenReturn(true);
- DatabaseMetaData metaData = Mockito.mock(DatabaseMetaData.class);
+ DatabaseMetaData metaData = mock(DatabaseMetaData.class);
Mockito.when(metaData.getURL()).thenReturn("jdbc:mariadb:xxx");
Mockito.when(connection.getMetaData()).thenReturn(metaData);
Mockito.when(driver.connect(any(), any())).thenReturn(connection);
@@ -131,11 +143,11 @@ public class DataSourceProxyXATest {
disabledReason = "druid 1.2.8 correct support kingbase")
public void testGetKingbaseXaConnection() throws SQLException,
ClassNotFoundException {
// Mock
- Driver driver = Mockito.mock(Driver.class);
+ Driver driver = mock(Driver.class);
Class clazz = Class.forName("com.kingbase8.jdbc.KbConnection");
- Connection connection = (Connection) (Mockito.mock(clazz));
+ Connection connection = (Connection) (mock(clazz));
Mockito.when(connection.getAutoCommit()).thenReturn(true);
- DatabaseMetaData metaData = Mockito.mock(DatabaseMetaData.class);
+ DatabaseMetaData metaData = mock(DatabaseMetaData.class);
Mockito.when(metaData.getURL()).thenReturn("jdbc:kingbase8:xxx");
Mockito.when(connection.getMetaData()).thenReturn(metaData);
Mockito.when(driver.connect(any(), any())).thenReturn(connection);
@@ -162,6 +174,38 @@ public class DataSourceProxyXATest {
tearDown();
}
+ @Test
+ public void testGetConnectionInCombineMode() throws SQLException {
+ RootContext.bind("testXID");
+ RootContext.bindCombineTransaction();
+
+ ConnectionProxyXA combineConn = mock(ConnectionProxyXA.class);
+ when(combineConn.isClosed()).thenReturn(false);
+
+ try (MockedStatic<CombineConnectionHolder> holderMock =
Mockito.mockStatic(CombineConnectionHolder.class)) {
+ holderMock
+ .when(() ->
CombineConnectionHolder.get(any(DataSource.class)))
+ .thenReturn(combineConn);
+ Driver driver = mock(Driver.class);
+ JDBC4MySQLConnection connection = mock(JDBC4MySQLConnection.class);
+ Mockito.when(connection.getAutoCommit()).thenReturn(true);
+ DatabaseMetaData metaData = mock(DatabaseMetaData.class);
+ Mockito.when(metaData.getURL()).thenReturn("jdbc:mysql:xxx");
+ Mockito.when(connection.getMetaData()).thenReturn(metaData);
+ Mockito.when(driver.connect(any(), any())).thenReturn(connection);
+
+ DruidDataSource realDataSource = new DruidDataSource();
+ realDataSource.setDriver(driver);
+ DataSourceProxyXA proxyDataSource = new
DataSourceProxyXA(realDataSource);
+
+ Connection result = proxyDataSource.getConnection();
+
+ Assertions.assertEquals(combineConn, result);
+
+ holderMock.verify(() ->
CombineConnectionHolder.get(realDataSource));
+ }
+ }
+
@AfterAll
public static void tearDown() {
RootContext.unbind();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]