[I] [Question] 网关向下游下发请求失败时,基于Reactor响应式流实现单机版本的异步非阻塞重试策略 [shenyu]</span></a></span> </h1> <p class="darkgray font13"> <span class="sender pipe"><a href="/search?l=notifications@shenyu.apache.org&q=from:%22via+GitHub%22" rel="nofollow"><span itemprop="author" itemscope itemtype="http://schema.org/Person"><span itemprop="name">via GitHub</span></span></a></span> <span class="date"><a href="/search?l=notifications@shenyu.apache.org&q=date:20250206" rel="nofollow">Thu, 06 Feb 2025 01:03:00 -0800</a></span> </p> </div> <div itemprop="articleBody" class="msgBody"> <!--X-Body-of-Message--> <pre> CaoMaoLuFei opened a new issue, #5920: URL: <a rel="nofollow" href="https://github.com/apache/shenyu/issues/5920">https://github.com/apache/shenyu/issues/5920</a></pre><pre> ### Question **Divide或HTTP Client插件中,向下游服务发送请求失败后的重试策略,当前实现方案是比较简单的,建议基于Reactor响应式流实现单机版本的异步非阻塞重试策略。** 实现效果: ● 异步非阻塞,不影响主线程 ● 支持自定义重试次数、重试间隔、重试上限 ● 支持按任务失败条件出发重试(eg: 指定异常重试) ● 支持异步获取任务执行结果,直到重试成功或者重试超限 ● 轻量级,不依赖第三方重试调度中间件(如MQ,ScheduleX),不依赖Spring框架本身提供的定时调度能力。 ● 多种重试策略:自定义策略(自定义退避序列)、固定间隔重试策略、默认重试策略等。 ## 当前失败重试代码如下 ![Image](<a rel="nofollow" href="https://github.com/user-attachments/assets/50beffcd-2ded-427d-845e-2fbd420ca169">https://github.com/user-attachments/assets/50beffcd-2ded-427d-845e-2fbd420ca169</a>) 如图所示,这里是For循环3次比较简单的失败重试。 ## 基于Reactor响应式流实现单机版本的异步非阻塞重试策略 实现示例代码如下: ### RetryUtils 工具类 ```java /** * 重试工具类 */ public class RetryUtils { /** * 记录重试次数 */ private static final AtomicInteger retryCount = new AtomicInteger(0); /** * 执行并返回响应流 */ public static Mono<String> execute(Object... param) { return execute(RetryBackoffSpecEnum.DEFAULT_BACKOFF, param); } /** * 执行并返回响应流(指定重试策略) */ public static Mono<String> execute( RetryBackoffSpecEnum backoffSpecEnum, Object... param) { return retryWithBackoff(doExecute(param), backoffSpecEnum); } private static Mono<String> doExecute(Object... param) { return Mono.defer(() -> { // 模拟50%的异常情况,指定异常重试 if (Math.random() < 0.5) { return Mono.error(new IllegalStateException("执行失败... " + Arrays.toString(param))); } else { return Mono.just("执行成功: " + Arrays.toString(param)); } }); } /** * @param mono 需要重试的操作 * @return 返回包含重试结果的Mono */ public static <T> Mono<T> retryWithBackoff(Mono<T> mono, RetryBackoffSpecEnum backoffSpecEnum) { RetryBackoffSpec backoffSpec = holders.get(backoffSpecEnum); return mono.doOnNext(i -> System.out.println("Processing item: " + i)) .doOnError(e -> System.err.println("Error occurred: " + e.getMessage())) .retryWhen( backoffSpec.doAfterRetry(retrySignal -> doRetry()) ) .doFinally(signalType -> { if (signalType == SignalType.ON_ERROR) { System.err.println("重试结束,最终失败"); } else if (signalType == SignalType.ON_COMPLETE) { System.out.println("重试结束,成功完成"); } resetRetryCount(); }); } public enum RetryBackoffSpecEnum { /** * 默认重试 */ DEFAULT_BACKOFF, /** * 固定重试 */ FIXED_BACKOFF, /** * 自定义重试 */ CUSTOM_BACKOFF, } private static final Map<RetryBackoffSpecEnum, RetryBackoffSpec> holders = new HashMap<>(); static { holders.put(RetryBackoffSpecEnum.DEFAULT_BACKOFF, initDefaultBackoff()); holders.put(RetryBackoffSpecEnum.FIXED_BACKOFF, initFixedBackoff()); holders.put(RetryBackoffSpecEnum.CUSTOM_BACKOFF, initCustomBackoff()); } private static RetryBackoffSpec initCustomBackoff() { // TODO 自己实现 return null; } private static RetryBackoffSpec initFixedBackoff() { return Retry.fixedDelay(5, Duration.ofSeconds(2)); } private static RetryBackoffSpec initDefaultBackoff() { return Retry.backoff(3, Duration.ofMillis(500)) .maxBackoff(Duration.ofSeconds(5)) // 只对瞬时错误进行重试 .transientErrors(true) // 添加 50% 的随机抖动到每次重试的延迟时间 .jitter(0.5d) .filter(t -> t instanceof IllegalStateException) // 当达到最大重试次数后抛出一个指定的异常 .onRetryExhaustedThrow((retryBackoffSpecErr, retrySignal) -> { throw new IllegalStateException("重试超限"); }); } private static void doRetry() { retryCount.incrementAndGet(); System.out.println("执行重试,重试次数: " + retryCount.get()); } private static void resetRetryCount() { retryCount.set(0); } } ``` ### 测试重试效果 ```java public static void main(String[] args) { for (int i = 1; i <= 20; i++) { Mono<String> result = RetryUtils.execute(String.format("第【%s】次调用", i)); // 订阅结果 result.subscribe( // 成功时处理结果 data -> System.out.println("Received: " + data), // 所有重试都失败时处理错误 error -> System.err.println("Final error: " + error.getMessage()), // 流完成时调用 () -> System.out.println("Completed") ); try { Thread.sleep(10000); // 给异步任务一点时间完成 System.out.println("===========================分隔符==========================="); } catch (InterruptedException ignored) { } } } ``` 使用默认重试策略时,命中IllegalStateException异常规则,重试间隔500ms,最失败大重试3次,如果重试过程中成功,则直接终止重试。 ● CASE1 第一次请求成功,不再触发重试: ![Image](<a rel="nofollow" href="https://github.com/user-attachments/assets/6501ec7e-f0f5-437a-aa94-8c7968a5d219">https://github.com/user-attachments/assets/6501ec7e-f0f5-437a-aa94-8c7968a5d219</a>) ● CASE2 首次请求执行失败,触发重试,并重试成功,终止重试: ![Image](<a rel="nofollow" href="https://github.com/user-attachments/assets/3de8be0b-ad94-4d8b-a5ee-f84942c4bf5e">https://github.com/user-attachments/assets/3de8be0b-ad94-4d8b-a5ee-f84942c4bf5e</a>) ● CASE3 首次请求执行失败,触发重试,重试次数达到上限,终止重试: ![Image](<a rel="nofollow" href="https://github.com/user-attachments/assets/8778ed5c-4999-43f7-8f6e-84f57fe51f2a">https://github.com/user-attachments/assets/8778ed5c-4999-43f7-8f6e-84f57fe51f2a</a>) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@shenyu.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org </pre> </div> <div class="msgButtons margintopdouble"> <ul class="overflow"> <li class="msgButtonItems"><a class="button buttonleft " accesskey="p" href="msg32519.html">Previous message</a></li> <li class="msgButtonItems textaligncenter"><a class="button" accesskey="c" href="thrd2.html#32523">View by thread</a></li> <li class="msgButtonItems textaligncenter"><a class="button" accesskey="i" href="mail2.html#32523">View by date</a></li> <li class="msgButtonItems textalignright"><a class="button buttonright " accesskey="n" href="msg32528.html">Next message</a></li> </ul> </div> <a name="tslice"></a> <div class="tSliceList margintopdouble"> <ul class="icons monospace"> <li class="icons-email tSliceCur"><span class="subject">[I] [Question] <title> 网关向下游下发请求失败时,基于Reactor响应式流实现单机版本的异...</span> <span class="sender italic">via GitHub</span></li> <li><ul> <li class="icons-email"><span class="subject"><a href="msg32528.html">Re: [I] [Question] <title> 网关向下游下发请求失败时,基于Reactor响应式...</a></span> <span class="sender italic">via GitHub</span></li> <li class="icons-email"><span class="subject"><a href="msg32726.html">Re: [I] [Question] <title> 网关向下游下发请求失败时,基于Reactor响应式...</a></span> <span class="sender italic">via GitHub</span></li> <li class="icons-email"><span class="subject"><a href="msg32758.html">Re: [I] [Question] <title> 网关向下游下发请求失败时,基于Reactor响应式...</a></span> <span class="sender italic">via GitHub</span></li> <li class="icons-email"><span class="subject"><a href="msg32771.html">Re: [I] [Question] <title> 网关向下游下发请求失败时,基于Reactor响应式...</a></span> <span class="sender italic">via GitHub</span></li> <li class="icons-email"><span class="subject"><a href="msg32775.html">Re: [I] [Question] <title> 网关向下游下发请求失败时,基于Reactor响应式...</a></span> <span class="sender italic">via GitHub</span></li> <li class="icons-email"><span class="subject"><a href="msg32776.html">Re: [I] [Question] <title> 网关向下游下发请求失败时,基于Reactor响应式...</a></span> <span class="sender italic">via GitHub</span></li> <li class="icons-email"><span class="subject"><a href="msg32866.html">Re: [I] [Question] <title> 网关向下游下发请求失败时,基于Reactor响应式...</a></span> <span class="sender italic">via GitHub</span></li> <li class="icons-email"><span class="subject"><a href="msg32891.html">Re: [I] [Question] <title> 网关向下游下发请求失败时,基于Reactor响应式...</a></span> <span class="sender italic">via GitHub</span></li> </ul> </ul> </div> <div class="overflow msgActions margintopdouble"> <div class="msgReply" > <h2> Reply via email to </h2> <form method="POST" action="/mailto.php"> <input type="hidden" name="subject" value="[I] [Question] <title> 网关向下游下发请求失败时,基于Reactor响应式流实现单机版本的异步非阻塞重试策略 [shenyu]"> <input type="hidden" name="msgid" value="I_kwDOCGCHjs6o-cEB@gitbox.apache.org"> <input type="hidden" name="relpath" value="notifications@shenyu.apache.org/msg32523.html"> <input type="submit" value=" via GitHub "> </form> </div> </div> </div> <div class="aside" role="complementary"> <div class="logo"> <a href="/"><img src="/logo.png" width=247 height=88 alt="The Mail Archive"></a> </div> <form class="overflow" action="/search" method="get"> <input type="hidden" name="l" value="notifications@shenyu.apache.org"> <label class="hidden" for="q">Search the site</label> <input class="submittext" type="text" id="q" name="q" placeholder="Search notifications"> <input class="submitbutton" name="submit" type="image" src="/submit.png" alt="Submit"> </form> <div class="nav margintop" id="nav" role="navigation"> <ul class="icons font16"> <li class="icons-home"><a href="/">The Mail Archive home</a></li> <li class="icons-list"><a href="/notifications@shenyu.apache.org/">notifications - all messages</a></li> <li class="icons-about"><a href="/notifications@shenyu.apache.org/info.html">notifications - about the list</a></li> <li class="icons-expand"><a href="/search?l=notifications@shenyu.apache.org&q=subject:%22%5C%5BI%5C%5D+%5C%5BQuestion%5C%5D+%3Ctitle%3E+%E7%BD%91%E5%85%B3%E5%90%91%E4%B8%8B%E6%B8%B8%E4%B8%8B%E5%8F%91%E8%AF%B7%E6%B1%82%E5%A4%B1%E8%B4%A5%E6%97%B6%EF%BC%8C%E5%9F%BA%E4%BA%8EReactor%E5%93%8D%E5%BA%94%E5%BC%8F%E6%B5%81%E5%AE%9E%E7%8E%B0%E5%8D%95%E6%9C%BA%E7%89%88%E6%9C%AC%E7%9A%84%E5%BC%82%E6%AD%A5%E9%9D%9E%E9%98%BB%E5%A1%9E%E9%87%8D%E8%AF%95%E7%AD%96%E7%95%A5+%5C%5Bshenyu%5C%5D%22&o=newest&f=1" title="e" id="e">Expand</a></li> <li class="icons-prev"><a href="msg32519.html" title="p">Previous message</a></li> <li class="icons-next"><a href="msg32528.html" title="n">Next message</a></li> </ul> </div> <div class="listlogo margintopdouble"> </div> <div class="margintopdouble"> </div> </div> </div> <div class="footer" role="contentinfo"> <ul> <li><a href="/">The Mail Archive home</a></li> <li><a href="/faq.html#newlist">Add your mailing list</a></li> <li><a href="/faq.html">FAQ</a></li> <li><a href="/faq.html#support">Support</a></li> <li><a href="/faq.html#privacy">Privacy</a></li> <li class="darkgray">I_kwDOCGCHjs6o-cEB@gitbox.apache.org</li> </ul> </div> </body> </html> <script>(function(){function c(){var b=a.contentDocument||a.contentWindow.document;if(b){var d=b.createElement('script');d.innerHTML="window.__CF$cv$params={r:'9ec40106eb40f25b',t:'MTc3NjE4MzI3MQ=='};var a=document.createElement('script');a.src='/cdn-cgi/challenge-platform/scripts/jsd/main.js';document.getElementsByTagName('head')[0].appendChild(a);";b.getElementsByTagName('head')[0].appendChild(d)}}if(document.body){var a=document.createElement('iframe');a.height=1;a.width=1;a.style.position='absolute';a.style.top=0;a.style.left=0;a.style.border='none';a.style.visibility='hidden';document.body.appendChild(a);if('loading'!==document.readyState)c();else if(window.addEventListener)document.addEventListener('DOMContentLoaded',c);else{var e=document.onreadystatechange||function(){};document.onreadystatechange=function(b){e(b);'loading'!==document.readyState&&(document.onreadystatechange=e,c())}}}})();</script>