This is an automated email from the ASF dual-hosted git repository.

popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git


The following commit(s) were added to refs/heads/main by this push:
     new 12078a7c4 Fix back pressure and try-later replies missing hasMore bit 
reset error in inbox fetching. (#234)
12078a7c4 is described below

commit 12078a7c4997f9f688c092b7b5e000b72fc1f8ef
Author: Gu Jiawei <[email protected]>
AuthorDate: Sun Mar 22 19:22:08 2026 -0700

    Fix back pressure and try-later replies missing hasMore bit reset error in 
inbox fetching. (#234)
---
 .../bifromq/inbox/server/InboxFetchPipeline.java   | 44 +++++++++++++---------
 .../server/InboxFetchPipelineMappingTest.java      | 34 +++++++++++++++++
 2 files changed, 61 insertions(+), 17 deletions(-)

diff --git 
a/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java
 
b/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java
index 7259742c9..2b67c1899 100644
--- 
a/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java
+++ 
b/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java
@@ -240,26 +240,36 @@ final class InboxFetchPipeline extends 
AckStream<InboxFetchHint, InboxFetched> i
                             .setInboxId(inboxId)
                             .setIncarnation(incarnation)
                             .setFetched(fetched).build());
-                        if (fetched.getQos0MsgCount() > 0 || 
fetched.getSendBufferMsgCount() > 0) {
-                            if (fetched.getQos0MsgCount() > 0) {
-                                fetchState.lastFetchQoS0Seq.set(
-                                    
fetched.getQos0Msg(fetched.getQos0MsgCount() - 1).getSeq());
+                        Fetched.Result result = fetched.getResult();
+                        switch (result) {
+                            case OK -> {
+                                if (fetched.getQos0MsgCount() > 0 || 
fetched.getSendBufferMsgCount() > 0) {
+                                    if (fetched.getQos0MsgCount() > 0) {
+                                        fetchState.lastFetchQoS0Seq.set(
+                                            
fetched.getQos0Msg(fetched.getQos0MsgCount() - 1).getSeq());
+                                    }
+                                    int fetchedCount = 0;
+                                    if (fetched.getSendBufferMsgCount() > 0) {
+                                        fetchedCount += 
fetched.getSendBufferMsgCount();
+                                        
fetchState.downStreamCapacity.accumulateAndGet(
+                                            fetched.getSendBufferMsgCount(),
+                                            (a, b) -> a == NOT_KNOWN_CAPACITY 
? a : Math.max(a - b, 0));
+                                        fetchState.lastFetchSendBufferSeq.set(
+                                            
fetched.getSendBufferMsg(fetched.getSendBufferMsgCount() - 1).getSeq());
+                                    }
+                                    fetchState.hasMore.set(fetchedCount >= 
request.params().getMaxFetch()
+                                        || fetchState.signalFetchTS.get() > 
fetchTS);
+                                } else {
+                                    
fetchState.hasMore.set(fetchState.signalFetchTS.get() > fetchTS);
+                                }
                             }
-                            int fetchedCount = 0;
-                            if (fetched.getSendBufferMsgCount() > 0) {
-                                fetchedCount += 
fetched.getSendBufferMsgCount();
-                                
fetchState.downStreamCapacity.accumulateAndGet(fetched.getSendBufferMsgCount(),
-                                    (a, b) -> a == NOT_KNOWN_CAPACITY ? a : 
Math.max(a - b, 0));
-                                fetchState.lastFetchSendBufferSeq.set(
-                                    
fetched.getSendBufferMsg(fetched.getSendBufferMsgCount() - 1).getSeq());
-                            }
-                            fetchState.hasMore.set(fetchedCount >= 
request.params().getMaxFetch()
-                                || fetchState.signalFetchTS.get() > fetchTS);
-                        } else {
-                            
fetchState.hasMore.set(fetchState.signalFetchTS.get() > fetchTS);
+                            case BACK_PRESSURE_REJECTED, TRY_LATER -> 
fetchState.hasMore.set(true);
+                            default -> fetchState.hasMore.set(false);
                         }
                         fetchState.fetching.set(false);
-                        if (fetchState.downStreamCapacity.get() > 0 && 
fetchState.hasMore.get()) {
+                        if (result == Fetched.Result.OK
+                            && fetchState.downStreamCapacity.get() > 0
+                            && fetchState.hasMore.get()) {
                             fetch(sessionId);
                         }
                     } catch (Throwable t) {
diff --git 
a/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java
 
b/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java
index cff671dfa..fa13c6bb7 100644
--- 
a/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java
+++ 
b/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java
@@ -212,6 +212,40 @@ public class InboxFetchPipelineMappingTest {
         pipeline.close();
     }
 
+    @Test
+    public void shouldRetryAfterTryLaterOnHint() {
+        InboxFetcherRegistry registry = new InboxFetcherRegistry();
+        TestFetcher fetcher = new TestFetcher();
+        InboxFetchPipeline pipeline = new InboxFetchPipeline(responseObserver, 
fetcher, registry);
+
+        long sessionId = 3503L;
+        pipeline.onNext(hint(sessionId, 2));
+
+        FetchRequest firstRequest = fetcher.awaitRequest();
+        assertNotNull(firstRequest);
+
+        fetcher.completeNext(Fetched.newBuilder()
+            .setResult(Fetched.Result.TRY_LATER)
+            .build());
+
+        await().until(() -> {
+            synchronized (received) {
+                return !received.isEmpty();
+            }
+        });
+
+        pipeline.onNext(hint(sessionId, 2));
+
+        FetchRequest retryRequest = fetcher.awaitRequest();
+        assertNotNull(retryRequest);
+
+        fetcher.completeNext(Fetched.newBuilder()
+            .setResult(Fetched.Result.OK)
+            .build());
+
+        pipeline.close();
+    }
+
     @Test
     public void shouldCleanStaleSessionIdWhenFetchStateMissing() throws 
Exception {
         InboxFetcherRegistry registry = new InboxFetcherRegistry();

Reply via email to