This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f7878d1b7 [ISSUE #5305] fix proxy TopicRouteService cache bug (#5306)
f7878d1b7 is described below

commit f7878d1b7cdf7ffe10339078a7dc40c1beb67642
Author: fuyou001 <[email protected]>
AuthorDate: Fri Oct 14 11:27:08 2022 +0800

    [ISSUE #5305] fix proxy TopicRouteService cache bug (#5306)
    
    * [ISSUE #5305] fix proxy TopicRouteService cache bug
    
    * [ISSUE #5305] add log
    
    * [ISSUE #5305] add  cache unit test
---
 .../proxy/service/route/TopicRouteService.java     | 11 +++++
 .../route/ClusterTopicRouteServiceTest.java        | 49 ++++++++++++++++++++++
 2 files changed, 60 insertions(+)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index b1e4517fe..85e5e39bd 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.proxy.common.Address;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 public abstract class TopicRouteService extends AbstractStartAndShutdown {
@@ -88,6 +89,16 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
                         throw e;
                     }
                 }
+
+                @Override public @Nullable MessageQueueView reload(@NonNull 
String key,
+                    @NonNull MessageQueueView oldValue) throws Exception {
+                    try {
+                        return load(key);
+                    } catch (Exception e) {
+                        log.warn(String.format("reload topic route from 
namesrv. topic: %s", key), e);
+                        return oldValue;
+                    }
+                }
             });
 
         this.init();
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
index 2a5d3189e..b0baa3e9b 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
@@ -17,14 +17,24 @@
 
 package org.apache.rocketmq.proxy.service.route;
 
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.net.HostAndPort;
 import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.proxy.common.Address;
 import org.apache.rocketmq.proxy.service.BaseServiceTest;
+import static org.assertj.core.api.Assertions.assertThat;
 import org.assertj.core.util.Lists;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -67,4 +77,43 @@ public class ClusterTopicRouteServiceTest extends 
BaseServiceTest {
         assertEquals(1, proxyTopicRouteData.getBrokerDatas().size());
         assertEquals(addressList, 
proxyTopicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(MixAll.MASTER_ID));
     }
+
+    @Test
+    public void testTopicRouteCaffeineCache() throws InterruptedException {
+        String key = "abc";
+        String value = key;
+        final AtomicBoolean throwException = new AtomicBoolean();
+        ThreadPoolExecutor cacheRefreshExecutor = 
ThreadPoolMonitor.createAndMonitor(
+            10, 10, 30L, TimeUnit.SECONDS, "test", 10);
+        LoadingCache<String /* topicName */, String> topicCache = 
Caffeine.newBuilder().maximumSize(30).
+            refreshAfterWrite(2, 
TimeUnit.SECONDS).executor(cacheRefreshExecutor).build(new CacheLoader<String, 
String>() {
+                @Override public @Nullable String load(@NonNull String key) 
throws Exception {
+                    try {
+                        if (throwException.get()) {
+                            throw new RuntimeException();
+                        } else {
+                            throwException.set(true);
+                            return value;
+                        }
+                    } catch (Exception e) {
+                        if (TopicRouteHelper.isTopicNotExistError(e)) {
+                            return "";
+                        }
+                        throw e;
+                    }
+                }
+
+                @Override
+                public @Nullable String reload(@NonNull String key, @NonNull 
String oldValue) throws Exception {
+                    try {
+                        return load(key);
+                    } catch (Exception e) {
+                        return oldValue;
+                    }
+                }
+            });
+        assertThat(value).isEqualTo(topicCache.get(key));
+        TimeUnit.SECONDS.sleep(5);
+        assertThat(value).isEqualTo(topicCache.get(key));
+    }
 }
\ No newline at end of file

Reply via email to