maple525866 commented on code in PR #7765:
URL: https://github.com/apache/incubator-seata/pull/7765#discussion_r2525533508
##########
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java:
##########
@@ -773,4 +815,109 @@ public void close(ChannelHandlerContext ctx,
ChannelPromise future) throws Excep
super.close(ctx, future);
}
}
+
+ protected boolean isEnableReconnect() {
+ return enableReconnect.get();
+ }
+
+ /**
+ * Create or get global worker
+ */
+ private static ExecutorService getOrCreateGlobalWorker() {
+ ExecutorService w = GLOBAL_RECONNECT_WORKER_REF.get();
+ if (w == null || w.isShutdown() || w.isTerminated()) {
+ ExecutorService created = new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ r -> {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("Global-Reconnect-Worker-" + t.getId());
+ return t;
+ });
+ if (!GLOBAL_RECONNECT_WORKER_REF.compareAndSet(w, created)) {
+ // another thread created it first; shutdown our created one
+ try {
+ created.shutdown();
+ } catch (Exception ex) {
+ LOGGER.warn("Failed to shutdown redundant created worker:
{}", ex.getMessage(), ex);
+ }
+ }
+ }
+ return GLOBAL_RECONNECT_WORKER_REF.get();
+ }
+
+ /**
+ * Start the global reconnect timer if it's not started.
+ */
+ static void startGlobalTimerIfNeeded() {
+ if (GLOBAL_TIMER_STARTED.compareAndSet(false, true)) {
+ ScheduledExecutorService timer = new
ScheduledThreadPoolExecutor(1, r -> {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("Global-Reconnect-Timer-" + t.getId());
+ return t;
+ });
+ GLOBAL_RECONNECT_TIMER_REF.set(timer);
+
+ // ensure worker exists before scheduling
+ getOrCreateGlobalWorker();
+
+ timer.scheduleAtFixedRate(
+ () -> {
+ ExecutorService worker =
GLOBAL_RECONNECT_WORKER_REF.get();
+ for (AbstractNettyRemotingClient client :
CLIENT_INSTANCES) {
+ if (client.isEnableReconnect()) {
+ try {
+ if (worker != null &&
!worker.isShutdown()) {
+ worker.submit(client.reconnectTask);
+ } else {
+ client.reconnectTask.run();
+ }
+ } catch (Exception ex) {
+ LOGGER.warn(
+ "Submit reconnect task failed for
client [{}], error: {}",
+ client.getClass().getSimpleName(),
+ ex.getMessage(),
+ ex);
+ }
+ }
+ }
+ },
+ SCHEDULE_DELAY_MILLS,
+ SCHEDULE_INTERVAL_MILLS,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Global client reconnect timer started only one
instance globally");
+ }
+ }
+
+ /**
+ * Shutdown the global reconnect timer and worker when there are no client
instances.
+ */
+ static void shutdownGlobalTimerIfNoClients() {
+ if (CLIENT_INSTANCES.isEmpty() &&
GLOBAL_TIMER_STARTED.compareAndSet(true, false)) {
+ ScheduledExecutorService timer =
GLOBAL_RECONNECT_TIMER_REF.getAndSet(null);
+ if (timer != null) {
+ try {
+ timer.shutdown();
+ timer.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (Exception ex) {
+ LOGGER.warn("Shutdown global reconnect timer failed: {}",
ex.getMessage(), ex);
+ }
+ }
Review Comment:
Currently, `shutdownGlobalTimerIfNoClients()` appears to have a race
condition. There's a window of opportunity: if another thread immediately
creates a client and calls `add(this)` after `isEmpty()` returns true, the
timer and worker will be stopped, but there will still be an active client. As
a result, the new client won't be reconnected by the timer.
##########
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java:
##########
@@ -773,4 +815,109 @@ public void close(ChannelHandlerContext ctx,
ChannelPromise future) throws Excep
super.close(ctx, future);
}
}
+
+ protected boolean isEnableReconnect() {
+ return enableReconnect.get();
+ }
+
+ /**
+ * Create or get global worker
+ */
+ private static ExecutorService getOrCreateGlobalWorker() {
+ ExecutorService w = GLOBAL_RECONNECT_WORKER_REF.get();
+ if (w == null || w.isShutdown() || w.isTerminated()) {
+ ExecutorService created = new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ r -> {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("Global-Reconnect-Worker-" + t.getId());
+ return t;
+ });
+ if (!GLOBAL_RECONNECT_WORKER_REF.compareAndSet(w, created)) {
+ // another thread created it first; shutdown our created one
+ try {
+ created.shutdown();
+ } catch (Exception ex) {
+ LOGGER.warn("Failed to shutdown redundant created worker:
{}", ex.getMessage(), ex);
+ }
+ }
+ }
+ return GLOBAL_RECONNECT_WORKER_REF.get();
+ }
+
+ /**
+ * Start the global reconnect timer if it's not started.
+ */
+ static void startGlobalTimerIfNeeded() {
+ if (GLOBAL_TIMER_STARTED.compareAndSet(false, true)) {
+ ScheduledExecutorService timer = new
ScheduledThreadPoolExecutor(1, r -> {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("Global-Reconnect-Timer-" + t.getId());
+ return t;
+ });
+ GLOBAL_RECONNECT_TIMER_REF.set(timer);
+
+ // ensure worker exists before scheduling
+ getOrCreateGlobalWorker();
+
+ timer.scheduleAtFixedRate(
+ () -> {
+ ExecutorService worker =
GLOBAL_RECONNECT_WORKER_REF.get();
+ for (AbstractNettyRemotingClient client :
CLIENT_INSTANCES) {
+ if (client.isEnableReconnect()) {
+ try {
+ if (worker != null &&
!worker.isShutdown()) {
+ worker.submit(client.reconnectTask);
+ } else {
+ client.reconnectTask.run();
+ }
+ } catch (Exception ex) {
+ LOGGER.warn(
+ "Submit reconnect task failed for
client [{}], error: {}",
+ client.getClass().getSimpleName(),
+ ex.getMessage(),
+ ex);
+ }
+ }
+ }
+ },
+ SCHEDULE_DELAY_MILLS,
+ SCHEDULE_INTERVAL_MILLS,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Global client reconnect timer started only one
instance globally");
+ }
+ }
+
+ /**
+ * Shutdown the global reconnect timer and worker when there are no client
instances.
+ */
+ static void shutdownGlobalTimerIfNoClients() {
+ if (CLIENT_INSTANCES.isEmpty() &&
GLOBAL_TIMER_STARTED.compareAndSet(true, false)) {
+ ScheduledExecutorService timer =
GLOBAL_RECONNECT_TIMER_REF.getAndSet(null);
+ if (timer != null) {
+ try {
+ timer.shutdown();
+ timer.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (Exception ex) {
+ LOGGER.warn("Shutdown global reconnect timer failed: {}",
ex.getMessage(), ex);
+ }
+ }
+ ExecutorService worker =
GLOBAL_RECONNECT_WORKER_REF.getAndSet(null);
+ if (worker != null) {
+ try {
+ worker.shutdown();
+ worker.awaitTermination(1, TimeUnit.SECONDS);
Review Comment:
Logically speaking, the `worker` also has this problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]