lokidundun commented on code in PR #7765:
URL: https://github.com/apache/incubator-seata/pull/7765#discussion_r2525672028
##########
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java:
##########
@@ -773,4 +815,109 @@ public void close(ChannelHandlerContext ctx,
ChannelPromise future) throws Excep
super.close(ctx, future);
}
}
+
+ protected boolean isEnableReconnect() {
+ return enableReconnect.get();
+ }
+
+ /**
+ * Create or get global worker
+ */
+ private static ExecutorService getOrCreateGlobalWorker() {
+ ExecutorService w = GLOBAL_RECONNECT_WORKER_REF.get();
+ if (w == null || w.isShutdown() || w.isTerminated()) {
+ ExecutorService created = new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ r -> {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("Global-Reconnect-Worker-" + t.getId());
+ return t;
+ });
+ if (!GLOBAL_RECONNECT_WORKER_REF.compareAndSet(w, created)) {
+ // another thread created it first; shutdown our created one
+ try {
+ created.shutdown();
+ } catch (Exception ex) {
+ LOGGER.warn("Failed to shutdown redundant created worker:
{}", ex.getMessage(), ex);
+ }
+ }
+ }
+ return GLOBAL_RECONNECT_WORKER_REF.get();
+ }
+
+ /**
+ * Start the global reconnect timer if it's not started.
+ */
+ static void startGlobalTimerIfNeeded() {
+ if (GLOBAL_TIMER_STARTED.compareAndSet(false, true)) {
+ ScheduledExecutorService timer = new
ScheduledThreadPoolExecutor(1, r -> {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("Global-Reconnect-Timer-" + t.getId());
+ return t;
+ });
+ GLOBAL_RECONNECT_TIMER_REF.set(timer);
+
+ // ensure worker exists before scheduling
+ getOrCreateGlobalWorker();
+
+ timer.scheduleAtFixedRate(
+ () -> {
+ ExecutorService worker =
GLOBAL_RECONNECT_WORKER_REF.get();
+ for (AbstractNettyRemotingClient client :
CLIENT_INSTANCES) {
+ if (client.isEnableReconnect()) {
+ try {
+ if (worker != null &&
!worker.isShutdown()) {
+ worker.submit(client.reconnectTask);
+ } else {
+ client.reconnectTask.run();
+ }
+ } catch (Exception ex) {
+ LOGGER.warn(
+ "Submit reconnect task failed for
client [{}], error: {}",
+ client.getClass().getSimpleName(),
+ ex.getMessage(),
+ ex);
+ }
+ }
+ }
+ },
+ SCHEDULE_DELAY_MILLS,
+ SCHEDULE_INTERVAL_MILLS,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Global client reconnect timer started only one
instance globally");
+ }
+ }
+
+ /**
+ * Shutdown the global reconnect timer and worker when there are no client
instances.
+ */
+ static void shutdownGlobalTimerIfNoClients() {
+ if (CLIENT_INSTANCES.isEmpty() &&
GLOBAL_TIMER_STARTED.compareAndSet(true, false)) {
+ ScheduledExecutorService timer =
GLOBAL_RECONNECT_TIMER_REF.getAndSet(null);
+ if (timer != null) {
+ try {
+ timer.shutdown();
+ timer.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (Exception ex) {
+ LOGGER.warn("Shutdown global reconnect timer failed: {}",
ex.getMessage(), ex);
+ }
+ }
Review Comment:
Thank you for identifying the race condition. I have tried to resolve it by
introducing a global lock and a double-check mechanism to ensure atomicity when
checking client instances and shutting down resources.
Key Fixes:
- Added private static final ReentrantLock GLOBAL_SHUTDOWN_LOCK to
synchronize the entire operation of checking CLIENT_INSTANCES.isEmpty() and
shutting down the timer/worker.
Please take another look when you have time❤️.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]