This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d2fd068be7 fix: registerProducer should not be affected by concurrent
scanNotAct… (#8847)
d2fd068be7 is described below
commit d2fd068be77d06495d810b799d29c2d1f222e4dc
Author: Zhanhui Li <[email protected]>
AuthorDate: Wed Oct 23 09:37:17 2024 +0800
fix: registerProducer should not be affected by concurrent scanNotAct…
(#8847)
* fix: registerProducer should not be affected by concurrent
scanNotActiveChannel
Signed-off-by: Li Zhanhui <[email protected]>
* chore: fix code format and make CI pass
Signed-off-by: Li Zhanhui <[email protected]>
---------
Signed-off-by: Li Zhanhui <[email protected]>
---
.../rocketmq/broker/client/ProducerManager.java | 36 ++++++++++++----------
1 file changed, 20 insertions(+), 16 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 011c9e4be3..2c3acb6ba9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -71,15 +71,15 @@ public class ProducerManager {
public ProducerTableInfo getProducerTable() {
Map<String, List<ProducerInfo>> map = new HashMap<>();
for (String group : this.groupChannelTable.keySet()) {
- for (Entry<Channel, ClientChannelInfo> entry:
this.groupChannelTable.get(group).entrySet()) {
+ for (Entry<Channel, ClientChannelInfo> entry :
this.groupChannelTable.get(group).entrySet()) {
ClientChannelInfo clientChannelInfo = entry.getValue();
if (map.containsKey(group)) {
map.get(group).add(new ProducerInfo(
- clientChannelInfo.getClientId(),
-
clientChannelInfo.getChannel().remoteAddress().toString(),
- clientChannelInfo.getLanguage(),
- clientChannelInfo.getVersion(),
- clientChannelInfo.getLastUpdateTimestamp()
+ clientChannelInfo.getClientId(),
+
clientChannelInfo.getChannel().remoteAddress().toString(),
+ clientChannelInfo.getLanguage(),
+ clientChannelInfo.getVersion(),
+ clientChannelInfo.getLastUpdateTimestamp()
));
} else {
map.put(group, new ArrayList<>(Collections.singleton(new
ProducerInfo(
@@ -118,8 +118,8 @@ public class ProducerManager {
clientChannelTable.remove(info.getClientId());
}
log.warn(
- "ProducerManager#scanNotActiveChannel: remove
expired channel[{}] from ProducerManager groupChannelTable, producer group
name: {}",
-
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+ "ProducerManager#scanNotActiveChannel: remove expired
channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
+
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, info);
RemotingHelper.closeChannel(info.getChannel());
}
@@ -144,8 +144,8 @@ public class ProducerManager {
clientChannelTable.remove(clientChannelInfo.getClientId());
removed = true;
log.info(
- "NETTY EVENT: remove channel[{}][{}] from
ProducerManager groupChannelTable, producer group: {}",
- clientChannelInfo.toString(), remoteAddr, group);
+ "NETTY EVENT: remove channel[{}][{}] from
ProducerManager groupChannelTable, producer group: {}",
+ clientChannelInfo.toString(), remoteAddr, group);
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group,
clientChannelInfo);
if (clientChannelInfoTable.isEmpty()) {
ConcurrentMap<Channel, ClientChannelInfo>
oldGroupTable = this.groupChannelTable.remove(group);
@@ -167,21 +167,26 @@ public class ProducerManager {
ConcurrentMap<Channel, ClientChannelInfo> channelTable =
this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new ConcurrentHashMap<>();
+ // Make sure channelTable will NOT be cleaned by
#scanNotActiveChannel
+ channelTable.put(clientChannelInfo.getChannel(),
clientChannelInfo);
ConcurrentMap<Channel, ClientChannelInfo> prev =
this.groupChannelTable.putIfAbsent(group, channelTable);
- if (null != prev) {
+ if (null == prev) {
+ // Add client-id to channel mapping for new producer group
+ clientChannelTable.put(clientChannelInfo.getClientId(),
clientChannelInfo.getChannel());
+ } else {
channelTable = prev;
}
}
clientChannelInfoFound =
channelTable.get(clientChannelInfo.getChannel());
+ // Add client-channel info to existing producer group
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(),
clientChannelInfo);
clientChannelTable.put(clientChannelInfo.getClientId(),
clientChannelInfo.getChannel());
- log.info("new producer connected, group: {} channel: {}", group,
- clientChannelInfo.toString());
+ log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
-
+ // Refresh existing client-channel-info update-timestamp
if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}
@@ -193,8 +198,7 @@ public class ProducerManager {
ClientChannelInfo old =
channelTable.remove(clientChannelInfo.getChannel());
clientChannelTable.remove(clientChannelInfo.getClientId());
if (old != null) {
- log.info("unregister a producer[{}] from groupChannelTable
{}", group,
- clientChannelInfo.toString());
+ log.info("unregister a producer[{}] from groupChannelTable
{}", group, clientChannelInfo.toString());
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group,
clientChannelInfo);
}