This is an automated email from the ASF dual-hosted git repository.
jimin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 7af668916a optimize: remove hardcoded port configuration in test
classes (#7460)
7af668916a is described below
commit 7af668916a0caf511b54205db85b509259350abd
Author: Eric Wang <[email protected]>
AuthorDate: Mon Sep 29 14:54:42 2025 +0800
optimize: remove hardcoded port configuration in test classes (#7460)
---
changes/en-us/2.x.md | 3 +-
changes/zh-cn/2.x.md | 1 +
.../seata/core/rpc/netty/BaseNettyClientTest.java | 192 ++++++++++++++++++
.../seata/core/rpc/netty/MsgVersionHelperTest.java | 99 +++-------
.../seata/core/rpc/netty/RmNettyClientTest.java | 132 +++++--------
.../seata/core/rpc/netty/TmNettyClientTest.java | 216 +++++++--------------
6 files changed, 341 insertions(+), 302 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 09c9ac22ca..30060e2586 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -41,9 +41,8 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7662](https://github.com/apache/incubator-seata/pull/7662)] ensure
visibility of rm and The methods in MockTest are executed in order
-
### optimize:
-
+- [[#7460](https://github.com/apache/incubator-seata/pull/7460)] Remove
hardcoded port configuration in core module test classes
- [[#7478](https://github.com/apache/incubator-seata/pull/7484)] optimize:
remove client id metric
- [[#7557](https://github.com/seata/seata/pull/7557)] upgrade some npmjs
dependencies
- [[#7576](https://github.com/seata/seata/pull/7576)] Add empty push
protection for Configuration
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 09417e6795..f8c9e403b7 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -44,6 +44,7 @@
### optimize:
+- [[#7460](https://github.com/apache/incubator-seata/pull/7460)] 移除 core
模块测试类中的硬编码端口配置
- [[#7478](https://github.com/apache/incubator-seata/pull/7484)] 删除client id指标
- [[#7557](https://github.com/seata/seata/pull/7557)] 升级 npmjs 依赖
- [[#7576](https://github.com/seata/seata/pull/7576)] 针对配置变更增加空推保护
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/BaseNettyClientTest.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/BaseNettyClientTest.java
new file mode 100644
index 0000000000..2538732ab3
--- /dev/null
+++
b/test/src/test/java/org/apache/seata/core/rpc/netty/BaseNettyClientTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import org.apache.seata.common.ConfigurationKeys;
+import org.apache.seata.common.ConfigurationTestHelper;
+import org.apache.seata.common.XID;
+import org.apache.seata.common.util.NetUtil;
+import org.apache.seata.common.util.UUIDGenerator;
+import org.apache.seata.server.coordinator.DefaultCoordinator;
+import org.apache.seata.server.session.SessionHolder;
+import org.junit.jupiter.api.AfterEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Base test class for Netty client tests to eliminate code duplication
+ */
+public abstract class BaseNettyClientTest {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BaseNettyClientTest.class);
+
+ /**
+ * Get a dynamic available port
+ */
+ protected static int getDynamicPort() throws IOException {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ return serverSocket.getLocalPort();
+ }
+ }
+
+ /**
+ * Initialize message executor thread pool
+ */
+ protected static ThreadPoolExecutor initMessageExecutor() {
+ return new ThreadPoolExecutor(
+ 5,
+ 5,
+ 500,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(20000),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ /**
+ * Start Seata server with dynamic port and intelligent waiting
+ */
+ protected ServerInstance startServer(int port) throws Exception {
+ ThreadPoolExecutor workingThreads = initMessageExecutor();
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setServerListenPort(port);
+ NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(workingThreads, serverConfig);
+
+ AtomicBoolean serverStatus = new AtomicBoolean();
+ Thread thread = new Thread(() -> {
+ try {
+ SessionHolder.init(null);
+
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer));
+ // set registry
+ XID.setIpAddress(NetUtil.getLocalIp());
+ XID.setPort(port);
+ // init snowflake for transactionId, branchId
+ UUIDGenerator.init(1L);
+ System.out.println(
+ "pid info: " +
ManagementFactory.getRuntimeMXBean().getName());
+ nettyRemotingServer.init();
+ serverStatus.set(true);
+ } catch (Throwable t) {
+ serverStatus.set(false);
+ LOGGER.error("The seata-server failed to start", t);
+ }
+ });
+ thread.start();
+
+ // Wait for the seata-server to start with intelligent waiting
+ long start = System.nanoTime();
+ long maxWaitNanoTime = 10 * 1000 * 1000 * 1000L; // 10s
+ while (System.nanoTime() - start < maxWaitNanoTime) {
+ Thread.sleep(100);
+ if (serverStatus.get()) {
+ break;
+ }
+ }
+ if (!serverStatus.get()) {
+ throw new RuntimeException("Waiting for a while, but the
seata-server did not start successfully.");
+ }
+
+ return new ServerInstance(nettyRemotingServer, port);
+ }
+
+ /**
+ * Start server with simpler logic (for some tests that don't need
intelligent waiting)
+ */
+ protected ServerInstance startServerSimple(int port) throws Exception {
+ ThreadPoolExecutor workingThreads = initMessageExecutor();
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setServerListenPort(port);
+ NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(workingThreads, serverConfig);
+
+ new Thread(() -> {
+ SessionHolder.init(null);
+
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer));
+ // set registry
+ XID.setIpAddress(NetUtil.getLocalIp());
+ XID.setPort(port);
+ // init snowflake for transactionId, branchId
+ UUIDGenerator.init(1L);
+ nettyRemotingServer.init();
+ })
+ .start();
+
+ Thread.sleep(3000); // Simple wait
+ return new ServerInstance(nettyRemotingServer, port);
+ }
+
+ /**
+ * Configure client to use the specified port
+ */
+ protected void configureClient(int port) {
+ ConfigurationTestHelper.putConfig("service.default.grouplist",
"127.0.0.1:" + port);
+
ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL,
String.valueOf(port));
+ }
+
+ /**
+ * Clean up client configuration
+ */
+ protected void cleanupClientConfig() {
+ ConfigurationTestHelper.removeConfig("service.default.grouplist");
+
ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL);
+ }
+
+ /**
+ * Server instance wrapper to hold server and port information
+ */
+ protected static class ServerInstance {
+ private final NettyRemotingServer server;
+ private final int port;
+
+ public ServerInstance(NettyRemotingServer server, int port) {
+ this.server = server;
+ this.port = port;
+ }
+
+ public NettyRemotingServer getServer() {
+ return server;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getAddress() {
+ return "127.0.0.1:" + port;
+ }
+
+ public void destroy() {
+ if (server != null) {
+ server.destroy();
+ }
+ }
+ }
+
+ /**
+ * Clean up configuration after each test
+ */
+ @AfterEach
+ public void cleanupAfterTest() {
+ cleanupClientConfig();
+ }
+}
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java
index 7f2b4273f2..95918d0579 100644
---
a/test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java
+++
b/test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java
@@ -17,95 +17,52 @@
package org.apache.seata.core.rpc.netty;
import io.netty.channel.Channel;
-import org.apache.seata.common.ConfigurationKeys;
-import org.apache.seata.common.ConfigurationTestHelper;
-import org.apache.seata.common.XID;
-import org.apache.seata.common.util.NetUtil;
-import org.apache.seata.common.util.UUIDGenerator;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.protocol.VersionNotSupportMessage;
import org.apache.seata.core.protocol.transaction.UndoLogDeleteRequest;
import org.apache.seata.core.rpc.MsgVersionHelper;
-import org.apache.seata.server.coordinator.DefaultCoordinator;
-import org.apache.seata.server.session.SessionHolder;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
- * MsgVersionHelper Test
- **/
-public class MsgVersionHelperTest {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MsgVersionHelperTest.class);
-
- @BeforeAll
- public static void init() {
-
ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL,
"8091");
- }
-
- @AfterAll
- public static void after() {
-
ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL);
- }
-
- public static ThreadPoolExecutor initMessageExecutor() {
- return new ThreadPoolExecutor(
- 5,
- 5,
- 500,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(20000),
- new ThreadPoolExecutor.CallerRunsPolicy());
- }
+ * MsgVersionHelper Test - Refactored to use BaseNettyClientTest
+ */
+public class MsgVersionHelperTest extends BaseNettyClientTest {
@Test
public void testSendMsgWithResponse() throws Exception {
- ThreadPoolExecutor workingThreads = initMessageExecutor();
- NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(workingThreads);
- new Thread(() -> {
- SessionHolder.init(null);
-
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer));
- // set registry
- XID.setIpAddress(NetUtil.getLocalIp());
- XID.setPort(8091);
- // init snowflake for transactionId, branchId
- UUIDGenerator.init(1L);
- nettyRemotingServer.init();
- })
- .start();
- Thread.sleep(3000);
+ int dynamicPort = getDynamicPort();
+ ServerInstance serverInstance = startServerSimple(dynamicPort);
+
+ try {
+ configureClient(dynamicPort);
- String applicationId = "app 1";
- String transactionServiceGroup = "default_tx_group";
- TmNettyRemotingClient tmNettyRemotingClient =
- TmNettyRemotingClient.getInstance(applicationId,
transactionServiceGroup);
- tmNettyRemotingClient.init();
+ String applicationId = "app 1";
+ String transactionServiceGroup = "default_tx_group";
+ TmNettyRemotingClient tmNettyRemotingClient =
+ TmNettyRemotingClient.getInstance(applicationId,
transactionServiceGroup);
+ tmNettyRemotingClient.init();
- String serverAddress = "0.0.0.0:8091";
- Channel channel =
-
TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress);
+ Channel channel = TmNettyRemotingClient.getInstance()
+ .getClientChannelManager()
+ .acquireChannel(serverInstance.getAddress());
- RpcMessage rpcMessage =
buildUndoLogDeleteMsg(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
- Assertions.assertFalse(MsgVersionHelper.versionNotSupport(channel,
rpcMessage));
- TmNettyRemotingClient.getInstance().sendAsync(channel, rpcMessage);
+ RpcMessage rpcMessage =
buildUndoLogDeleteMsg(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
+ Assertions.assertFalse(MsgVersionHelper.versionNotSupport(channel,
rpcMessage));
+ TmNettyRemotingClient.getInstance().sendAsync(channel, rpcMessage);
- Version.putChannelVersion(channel, "0.7.0");
- Assertions.assertTrue(MsgVersionHelper.versionNotSupport(channel,
rpcMessage));
- TmNettyRemotingClient.getInstance().sendAsync(channel, rpcMessage);
- Object response =
TmNettyRemotingClient.getInstance().sendSync(channel, rpcMessage, 100);
- Assertions.assertTrue(response instanceof VersionNotSupportMessage);
+ Version.putChannelVersion(channel, "0.7.0");
+ Assertions.assertTrue(MsgVersionHelper.versionNotSupport(channel,
rpcMessage));
+ TmNettyRemotingClient.getInstance().sendAsync(channel, rpcMessage);
+ Object response =
TmNettyRemotingClient.getInstance().sendSync(channel, rpcMessage, 100);
+ Assertions.assertTrue(response instanceof
VersionNotSupportMessage);
- nettyRemotingServer.destroy();
- tmNettyRemotingClient.destroy();
+ tmNettyRemotingClient.destroy();
+ } finally {
+ serverInstance.destroy();
+ }
}
private RpcMessage buildUndoLogDeleteMsg(byte messageType) {
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java
index dd8b00e40a..1f03a3580b 100644
--- a/test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java
+++ b/test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java
@@ -17,108 +17,72 @@
package org.apache.seata.core.rpc.netty;
import io.netty.channel.Channel;
-import org.apache.seata.common.ConfigurationKeys;
-import org.apache.seata.common.ConfigurationTestHelper;
-import org.apache.seata.common.XID;
-import org.apache.seata.common.util.NetUtil;
-import org.apache.seata.common.util.UUIDGenerator;
import org.apache.seata.core.protocol.ResultCode;
import org.apache.seata.core.protocol.transaction.BranchRegisterRequest;
import org.apache.seata.core.protocol.transaction.BranchRegisterResponse;
import org.apache.seata.rm.tcc.TCCResourceManager;
-import org.apache.seata.saga.engine.db.AbstractServerTest;
-import org.apache.seata.server.coordinator.DefaultCoordinator;
-import org.apache.seata.server.session.SessionHolder;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public class RmNettyClientTest extends AbstractServerTest {
+/**
+ * RmNettyClient Test - Refactored to use BaseNettyClientTest
+ */
+public class RmNettyClientTest extends BaseNettyClientTest {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RmNettyClientTest.class);
+ @Test
+ public void testMergeMsg() throws Exception {
+ int dynamicPort = getDynamicPort();
+ ServerInstance serverInstance = startServerSimple(dynamicPort);
- @BeforeAll
- public static void init() {
-
ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL,
"8091");
- }
+ try {
+ configureClient(dynamicPort);
- @AfterAll
- public static void after() {
-
ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL);
- }
+ String applicationId = "app 1";
+ String transactionServiceGroup = "default_tx_group";
+ RmNettyRemotingClient rmNettyRemotingClient =
+ RmNettyRemotingClient.getInstance(applicationId,
transactionServiceGroup);
+ rmNettyRemotingClient.setResourceManager(new TCCResourceManager());
+ rmNettyRemotingClient.init();
+
rmNettyRemotingClient.getClientChannelManager().initReconnect(transactionServiceGroup,
true);
- public static ThreadPoolExecutor initMessageExecutor() {
- return new ThreadPoolExecutor(
- 5,
- 5,
- 500,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(20000),
- new ThreadPoolExecutor.CallerRunsPolicy());
- }
-
- @Test
- public void testMergeMsg() throws Exception {
- ThreadPoolExecutor workingThreads = initMessageExecutor();
- NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(workingThreads);
- new Thread(() -> {
- SessionHolder.init(null);
-
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer));
- // set registry
- XID.setIpAddress(NetUtil.getLocalIp());
- XID.setPort(8091);
- // init snowflake for transactionId, branchId
- UUIDGenerator.init(1L);
- nettyRemotingServer.init();
- })
- .start();
- Thread.sleep(3000);
+ Channel channel = RmNettyRemotingClient.getInstance()
+ .getClientChannelManager()
+ .acquireChannel(serverInstance.getAddress());
+ Assertions.assertNotNull(channel);
- String applicationId = "app 1";
- String transactionServiceGroup = "default_tx_group";
- RmNettyRemotingClient rmNettyRemotingClient =
- RmNettyRemotingClient.getInstance(applicationId,
transactionServiceGroup);
- rmNettyRemotingClient.setResourceManager(new TCCResourceManager());
- rmNettyRemotingClient.init();
-
rmNettyRemotingClient.getClientChannelManager().initReconnect(transactionServiceGroup,
true);
- String serverAddress = "0.0.0.0:8091";
- Channel channel =
-
RmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress);
- Assertions.assertNotNull(channel);
+ CountDownLatch latch = new CountDownLatch(3);
+ for (int i = 0; i < 3; i++) {
+ CompletableFuture.runAsync(() -> {
+ BranchRegisterRequest request = new
BranchRegisterRequest();
+ request.setXid("127.0.0.1:" + dynamicPort + ":1249853");
+ request.setLockKey("lock key testSendMsgWithResponse");
+ request.setResourceId("resoutceId1");
+ BranchRegisterResponse branchRegisterResponse = null;
+ try {
+ branchRegisterResponse =
+ (BranchRegisterResponse)
rmNettyRemotingClient.sendSyncRequest(request);
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ Assertions.assertNotNull(branchRegisterResponse);
+ Assertions.assertEquals(ResultCode.Failed,
branchRegisterResponse.getResultCode());
+ Assertions.assertEquals(
+ "TransactionException[Could not found global
transaction xid = 127.0.0.1:" + dynamicPort
+ + ":1249853, may be has finished.]",
+ branchRegisterResponse.getMsg());
+ latch.countDown();
+ });
+ }
+ latch.await(10, TimeUnit.SECONDS);
- CountDownLatch latch = new CountDownLatch(3);
- for (int i = 0; i < 3; i++) {
- CompletableFuture.runAsync(() -> {
- BranchRegisterRequest request = new BranchRegisterRequest();
- request.setXid("127.0.0.1:8091:1249853");
- request.setLockKey("lock key testSendMsgWithResponse");
- request.setResourceId("resoutceId1");
- BranchRegisterResponse branchRegisterResponse = null;
- try {
- branchRegisterResponse = (BranchRegisterResponse)
rmNettyRemotingClient.sendSyncRequest(request);
- } catch (TimeoutException e) {
- throw new RuntimeException(e);
- }
- Assertions.assertNotNull(branchRegisterResponse);
- Assertions.assertEquals(ResultCode.Failed,
branchRegisterResponse.getResultCode());
- Assertions.assertEquals(
- "TransactionException[Could not found global
transaction xid = 127.0.0.1:8091:1249853, may be has finished.]",
- branchRegisterResponse.getMsg());
- latch.countDown();
- });
+ rmNettyRemotingClient.destroy();
+ } finally {
+ serverInstance.destroy();
}
- latch.await(10, TimeUnit.SECONDS);
- nettyRemotingServer.destroy();
- rmNettyRemotingClient.destroy();
}
}
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java
index f778ba54b0..e1cf6e6957 100644
--- a/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java
+++ b/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java
@@ -17,51 +17,18 @@
package org.apache.seata.core.rpc.netty;
import io.netty.channel.Channel;
-import org.apache.seata.common.ConfigurationKeys;
-import org.apache.seata.common.ConfigurationTestHelper;
-import org.apache.seata.common.XID;
-import org.apache.seata.common.util.NetUtil;
-import org.apache.seata.common.util.UUIDGenerator;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.protocol.transaction.GlobalCommitRequest;
import org.apache.seata.core.protocol.transaction.GlobalCommitResponse;
-import org.apache.seata.saga.engine.db.AbstractServerTest;
-import org.apache.seata.server.coordinator.DefaultCoordinator;
-import org.apache.seata.server.session.SessionHolder;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
+ * TmNettyClient Test - Refactored to use BaseNettyClientTest
*/
-public class TmNettyClientTest extends AbstractServerTest {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(TmNettyClientTest.class);
-
- @BeforeAll
- public static void init() {
-
ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL,
"8091");
- }
-
- @AfterAll
- public static void after() {
-
ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL);
- }
-
- public static ThreadPoolExecutor initMessageExecutor() {
- return new ThreadPoolExecutor(
- 5, 5, 500, TimeUnit.SECONDS, new LinkedBlockingQueue(20000),
new ThreadPoolExecutor.CallerRunsPolicy());
- }
+public class TmNettyClientTest extends BaseNettyClientTest {
/**
* Client rely on server's starting first
@@ -70,55 +37,28 @@ public class TmNettyClientTest extends AbstractServerTest {
*/
@Test
public void testDoConnect() throws Exception {
- ThreadPoolExecutor workingThreads = initMessageExecutor();
- NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(workingThreads);
- // start services server first
- AtomicBoolean serverStatus = new AtomicBoolean();
- Thread thread = new Thread(() -> {
- try {
-
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer));
- // set registry
- XID.setIpAddress(NetUtil.getLocalIp());
- XID.setPort(8091);
- // init snowflake for transactionId, branchId
- UUIDGenerator.init(1L);
- System.out.println(
- "pid info: " +
ManagementFactory.getRuntimeMXBean().getName());
- nettyRemotingServer.init();
- serverStatus.set(true);
- } catch (Throwable t) {
- serverStatus.set(false);
- LOGGER.error("The seata-server failed to start", t);
- }
- });
- thread.start();
-
- // Wait for the seata-server to start.
- long start = System.nanoTime();
- long maxWaitNanoTime = 10 * 1000 * 1000 * 1000L; // 10s
- while (System.nanoTime() - start < maxWaitNanoTime) {
- Thread.sleep(100);
- if (serverStatus.get()) {
- break;
- }
- }
- if (!serverStatus.get()) {
- throw new RuntimeException("Waiting for a while, but the
seata-server did not start successfully.");
- }
+ int dynamicPort = getDynamicPort();
+ ServerInstance serverInstance = startServer(dynamicPort);
- // then test client
- String applicationId = "app 1";
- String transactionServiceGroup = "group A";
- TmNettyRemotingClient tmNettyRemotingClient =
- TmNettyRemotingClient.getInstance(applicationId,
transactionServiceGroup);
-
- tmNettyRemotingClient.init();
- String serverAddress = "0.0.0.0:8091";
- Channel channel =
-
TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress);
- Assertions.assertNotNull(channel);
- nettyRemotingServer.destroy();
- tmNettyRemotingClient.destroy();
+ try {
+ configureClient(dynamicPort);
+
+ // then test client
+ String applicationId = "app 1";
+ String transactionServiceGroup = "group A";
+ TmNettyRemotingClient tmNettyRemotingClient =
+ TmNettyRemotingClient.getInstance(applicationId,
transactionServiceGroup);
+
+ tmNettyRemotingClient.init();
+ Channel channel = TmNettyRemotingClient.getInstance()
+ .getClientChannelManager()
+ .acquireChannel(serverInstance.getAddress());
+ Assertions.assertNotNull(channel);
+
+ tmNettyRemotingClient.destroy();
+ } finally {
+ serverInstance.destroy();
+ }
}
/**
@@ -128,73 +68,59 @@ public class TmNettyClientTest extends AbstractServerTest {
*/
@Test
public void testReconnect() throws Exception {
- ThreadPoolExecutor workingThreads = initMessageExecutor();
- NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(workingThreads);
- // start services server first
- Thread thread = new Thread(() -> {
-
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer));
- // set registry
- XID.setIpAddress(NetUtil.getLocalIp());
- XID.setPort(8091);
- // init snowflake for transactionId, branchId
- UUIDGenerator.init(1L);
- nettyRemotingServer.init();
- });
- thread.start();
-
- // then test client
- Thread.sleep(3000);
-
- String applicationId = "app 1";
- String transactionServiceGroup = "default_tx_group";
- TmNettyRemotingClient tmNettyRemotingClient =
- TmNettyRemotingClient.getInstance(applicationId,
transactionServiceGroup);
-
- tmNettyRemotingClient.init();
-
-
TmNettyRemotingClient.getInstance().getClientChannelManager().reconnect(transactionServiceGroup);
- nettyRemotingServer.destroy();
- tmNettyRemotingClient.destroy();
+ int dynamicPort = getDynamicPort();
+ ServerInstance serverInstance = startServerSimple(dynamicPort);
+
+ try {
+ configureClient(dynamicPort);
+
+ String applicationId = "app 1";
+ String transactionServiceGroup = "default_tx_group";
+ TmNettyRemotingClient tmNettyRemotingClient =
+ TmNettyRemotingClient.getInstance(applicationId,
transactionServiceGroup);
+
+ tmNettyRemotingClient.init();
+
TmNettyRemotingClient.getInstance().getClientChannelManager().reconnect(transactionServiceGroup);
+
+ tmNettyRemotingClient.destroy();
+ } finally {
+ serverInstance.destroy();
+ }
}
@Test
public void testSendMsgWithResponse() throws Exception {
- ThreadPoolExecutor workingThreads = initMessageExecutor();
- NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(workingThreads);
- new Thread(() -> {
- SessionHolder.init(null);
-
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer));
- // set registry
- XID.setIpAddress(NetUtil.getLocalIp());
- XID.setPort(8091);
- // init snowflake for transactionId, branchId
- UUIDGenerator.init(1L);
- nettyRemotingServer.init();
- })
- .start();
- Thread.sleep(3000);
-
- String applicationId = "app 1";
- String transactionServiceGroup = "default_tx_group";
- TmNettyRemotingClient tmNettyRemotingClient =
- TmNettyRemotingClient.getInstance(applicationId,
transactionServiceGroup);
- tmNettyRemotingClient.init();
-
- String serverAddress = "0.0.0.0:8091";
- Channel channel =
-
TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress);
- Assertions.assertNotNull(channel);
- GlobalCommitRequest request = new GlobalCommitRequest();
- request.setXid("127.0.0.1:8091:1249853");
- GlobalCommitResponse globalCommitResponse = null;
+ int dynamicPort = getDynamicPort();
+ ServerInstance serverInstance = startServerSimple(dynamicPort);
+
try {
- globalCommitResponse = (GlobalCommitResponse)
tmNettyRemotingClient.sendSyncRequest(request);
- } catch (TimeoutException e) {
- throw new RuntimeException(e);
+ configureClient(dynamicPort);
+
+ String applicationId = "app 1";
+ String transactionServiceGroup = "default_tx_group";
+ TmNettyRemotingClient tmNettyRemotingClient =
+ TmNettyRemotingClient.getInstance(applicationId,
transactionServiceGroup);
+ tmNettyRemotingClient.init();
+
+ Channel channel = TmNettyRemotingClient.getInstance()
+ .getClientChannelManager()
+ .acquireChannel(serverInstance.getAddress());
+ Assertions.assertNotNull(channel);
+
+ GlobalCommitRequest request = new GlobalCommitRequest();
+ request.setXid("127.0.0.1:" + dynamicPort + ":1249853");
+ GlobalCommitResponse globalCommitResponse = null;
+ try {
+ globalCommitResponse = (GlobalCommitResponse)
tmNettyRemotingClient.sendSyncRequest(request);
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ Assertions.assertNotNull(globalCommitResponse);
+ Assertions.assertEquals(GlobalStatus.Finished,
globalCommitResponse.getGlobalStatus());
+
+ tmNettyRemotingClient.destroy();
+ } finally {
+ serverInstance.destroy();
}
- Assertions.assertNotNull(globalCommitResponse);
- Assertions.assertEquals(GlobalStatus.Finished,
globalCommitResponse.getGlobalStatus());
- nettyRemotingServer.destroy();
- tmNettyRemotingClient.destroy();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]