YvCeung commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2659565118
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -57,24 +59,35 @@ public class ClusterWatcherManager implements
ClusterChangeListener {
private static final Map<String, Long> GROUP_UPDATE_TERM = new
ConcurrentHashMap<>();
+ private static final Map<Watcher<HttpContext>, Boolean> HTTP2_HEADERS_SENT
= new ConcurrentHashMap<>();
+
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("long-polling", 1));
@PostConstruct
public void init() {
- // Responds to monitors that time out
+ // Periodically check and respond to watchers that have timed out
scheduledThreadPoolExecutor.scheduleAtFixedRate(
() -> {
for (String group : WATCHERS.keySet()) {
Optional.ofNullable(WATCHERS.remove(group))
.ifPresent(watchers ->
watchers.parallelStream().forEach(watcher -> {
- if (System.currentTimeMillis() >=
watcher.getTimeout()) {
- watcher.setDone(true);
- sendWatcherResponse(watcher,
HttpResponseStatus.NOT_MODIFIED);
- }
- if (!watcher.isDone()) {
- // Re-register
- registryWatcher(watcher);
+ HttpContext context =
watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof
HttpContext && context.isHttp2();
+ if (isHttp2) {
+ if
(!context.getContext().channel().isActive()) {
+ watcher.setDone(true);
+ HTTP2_HEADERS_SENT.remove(watcher);
+ } else {
+ registryWatcher(watcher);
+ }
+ } else {
+ if (System.currentTimeMillis() >=
watcher.getTimeout()) {
+ watcher.setDone(true);
+ sendWatcherResponse(watcher,
HttpResponseStatus.NOT_MODIFIED, true, false);
+ } else if (!watcher.isDone()) {
+ registryWatcher(watcher);
+ }
}
}));
}
Review Comment:
当前实现是安全的,原因如下:
1. 新注册的 watcher 不会丢失:`registryWatcher` 使用 `computeIfAbsent`,即使队列被 `remove`,新
watcher 注册时会创建新队列并加入。
2. 定时任务会定期扫描:每秒执行一次,新 watcher 最多等待 1 秒即可被处理。
3. 并发场景下:定时任务 `remove` 后,新 watcher 会进入新队列;定时任务重新注册的 watcher
也会进入同一新队列,下次扫描时一并处理。
对于长轮询场景,1 秒的延迟是可接受的。当前实现已满足需求,无需额外优化。
---
The current implementation is safe for the following reasons:
1. **New watchers won't be lost**: The `registryWatcher` method uses
`computeIfAbsent`, so even if the queue is removed by `WATCHERS.remove(group)`,
newly registered watchers will create a new queue and be added to it.
2. **Scheduled task will scan periodically**: The task runs every second, so
new watchers will be processed within at most 1 second.
3. **In concurrent scenarios**: After the scheduled task removes a queue,
new watchers will be added to a new queue created by `computeIfAbsent`. When
the scheduled task re-registers watchers, they will also be added to the same
new queue, and all will be processed in the next scan.
For long-polling scenarios, a 1-second delay is acceptable. The current
implementation meets the requirements and no additional optimization is needed.
@funky-eyes WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]