This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d0a69be563 [ISSUE #6810] Fix the bug of mistakenly deleting data in
clientChannelTable when the channel expire (#7073)
d0a69be563 is described below
commit d0a69be563785ca815dc31ef1aab4c1bc5588c01
Author: zd46319 <[email protected]>
AuthorDate: Thu Jul 27 16:56:41 2023 +0800
[ISSUE #6810] Fix the bug of mistakenly deleting data in clientChannelTable
when the channel expire (#7073)
---
.../rocketmq/broker/client/ProducerManager.java | 5 +++-
.../broker/client/ProducerManagerTest.java | 34 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 52d67bf282..f9fe1193e2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -112,7 +112,10 @@ public class ProducerManager {
long diff = System.currentTimeMillis() -
info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
- clientChannelTable.remove(info.getClientId());
+ Channel channelInClientTable =
clientChannelTable.get(info.getClientId());
+ if (channelInClientTable != null &&
channelInClientTable.equals(info.getChannel())) {
+ clientChannelTable.remove(info.getClientId());
+ }
log.warn(
"ProducerManager#scanNotActiveChannel: remove
expired channel[{}] from ProducerManager groupChannelTable, producer group
name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index dac5468c87..3d6091e02f 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -27,6 +27,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -79,6 +80,39 @@ public class ProducerManagerTest {
assertThat(producerManager.findChannel("clientId")).isNull();
}
+ @Test
+ public void scanNotActiveChannelWithSameClientId() throws Exception {
+ producerManager.registerProducer(group, clientInfo);
+ Channel channel1 = Mockito.mock(Channel.class);
+ ClientChannelInfo clientInfo1 = new ClientChannelInfo(channel1,
clientInfo.getClientId(), LanguageCode.JAVA, 0);
+ producerManager.registerProducer(group, clientInfo1);
+ AtomicReference<String> groupRef = new AtomicReference<>();
+ AtomicReference<ClientChannelInfo> clientChannelInfoRef = new
AtomicReference<>();
+ producerManager.appendProducerChangeListener((event, group,
clientChannelInfo) -> {
+ switch (event) {
+ case GROUP_UNREGISTER:
+ groupRef.set(group);
+ break;
+ case CLIENT_UNREGISTER:
+ clientChannelInfoRef.set(clientChannelInfo);
+ break;
+ default:
+ break;
+ }
+ });
+
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
+
assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull();
+ assertThat(producerManager.findChannel("clientId")).isNotNull();
+ Field field =
ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
+ field.setAccessible(true);
+ long channelExpiredTimeout = field.getLong(producerManager);
+ clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() -
channelExpiredTimeout - 10);
+ when(channel.close()).thenReturn(mock(ChannelFuture.class));
+ producerManager.scanNotActiveChannel();
+
assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull();
+ assertThat(producerManager.findChannel("clientId")).isNotNull();
+ }
+
@Test
public void doChannelCloseEvent() throws Exception {
producerManager.registerProducer(group, clientInfo);