This is an automated email from the ASF dual-hosted git repository.
popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new 12078a7c4 Fix back pressure and try-later replies missing hasMore bit
reset error in inbox fetching. (#234)
12078a7c4 is described below
commit 12078a7c4997f9f688c092b7b5e000b72fc1f8ef
Author: Gu Jiawei <[email protected]>
AuthorDate: Sun Mar 22 19:22:08 2026 -0700
Fix back pressure and try-later replies missing hasMore bit reset error in
inbox fetching. (#234)
---
.../bifromq/inbox/server/InboxFetchPipeline.java | 44 +++++++++++++---------
.../server/InboxFetchPipelineMappingTest.java | 34 +++++++++++++++++
2 files changed, 61 insertions(+), 17 deletions(-)
diff --git
a/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java
b/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java
index 7259742c9..2b67c1899 100644
---
a/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java
+++
b/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/InboxFetchPipeline.java
@@ -240,26 +240,36 @@ final class InboxFetchPipeline extends
AckStream<InboxFetchHint, InboxFetched> i
.setInboxId(inboxId)
.setIncarnation(incarnation)
.setFetched(fetched).build());
- if (fetched.getQos0MsgCount() > 0 ||
fetched.getSendBufferMsgCount() > 0) {
- if (fetched.getQos0MsgCount() > 0) {
- fetchState.lastFetchQoS0Seq.set(
-
fetched.getQos0Msg(fetched.getQos0MsgCount() - 1).getSeq());
+ Fetched.Result result = fetched.getResult();
+ switch (result) {
+ case OK -> {
+ if (fetched.getQos0MsgCount() > 0 ||
fetched.getSendBufferMsgCount() > 0) {
+ if (fetched.getQos0MsgCount() > 0) {
+ fetchState.lastFetchQoS0Seq.set(
+
fetched.getQos0Msg(fetched.getQos0MsgCount() - 1).getSeq());
+ }
+ int fetchedCount = 0;
+ if (fetched.getSendBufferMsgCount() > 0) {
+ fetchedCount +=
fetched.getSendBufferMsgCount();
+
fetchState.downStreamCapacity.accumulateAndGet(
+ fetched.getSendBufferMsgCount(),
+ (a, b) -> a == NOT_KNOWN_CAPACITY
? a : Math.max(a - b, 0));
+ fetchState.lastFetchSendBufferSeq.set(
+
fetched.getSendBufferMsg(fetched.getSendBufferMsgCount() - 1).getSeq());
+ }
+ fetchState.hasMore.set(fetchedCount >=
request.params().getMaxFetch()
+ || fetchState.signalFetchTS.get() >
fetchTS);
+ } else {
+
fetchState.hasMore.set(fetchState.signalFetchTS.get() > fetchTS);
+ }
}
- int fetchedCount = 0;
- if (fetched.getSendBufferMsgCount() > 0) {
- fetchedCount +=
fetched.getSendBufferMsgCount();
-
fetchState.downStreamCapacity.accumulateAndGet(fetched.getSendBufferMsgCount(),
- (a, b) -> a == NOT_KNOWN_CAPACITY ? a :
Math.max(a - b, 0));
- fetchState.lastFetchSendBufferSeq.set(
-
fetched.getSendBufferMsg(fetched.getSendBufferMsgCount() - 1).getSeq());
- }
- fetchState.hasMore.set(fetchedCount >=
request.params().getMaxFetch()
- || fetchState.signalFetchTS.get() > fetchTS);
- } else {
-
fetchState.hasMore.set(fetchState.signalFetchTS.get() > fetchTS);
+ case BACK_PRESSURE_REJECTED, TRY_LATER ->
fetchState.hasMore.set(true);
+ default -> fetchState.hasMore.set(false);
}
fetchState.fetching.set(false);
- if (fetchState.downStreamCapacity.get() > 0 &&
fetchState.hasMore.get()) {
+ if (result == Fetched.Result.OK
+ && fetchState.downStreamCapacity.get() > 0
+ && fetchState.hasMore.get()) {
fetch(sessionId);
}
} catch (Throwable t) {
diff --git
a/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java
b/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java
index cff671dfa..fa13c6bb7 100644
---
a/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java
+++
b/bifromq-inbox/bifromq-inbox-server/src/test/java/org/apache/bifromq/inbox/server/InboxFetchPipelineMappingTest.java
@@ -212,6 +212,40 @@ public class InboxFetchPipelineMappingTest {
pipeline.close();
}
+ @Test
+ public void shouldRetryAfterTryLaterOnHint() {
+ InboxFetcherRegistry registry = new InboxFetcherRegistry();
+ TestFetcher fetcher = new TestFetcher();
+ InboxFetchPipeline pipeline = new InboxFetchPipeline(responseObserver,
fetcher, registry);
+
+ long sessionId = 3503L;
+ pipeline.onNext(hint(sessionId, 2));
+
+ FetchRequest firstRequest = fetcher.awaitRequest();
+ assertNotNull(firstRequest);
+
+ fetcher.completeNext(Fetched.newBuilder()
+ .setResult(Fetched.Result.TRY_LATER)
+ .build());
+
+ await().until(() -> {
+ synchronized (received) {
+ return !received.isEmpty();
+ }
+ });
+
+ pipeline.onNext(hint(sessionId, 2));
+
+ FetchRequest retryRequest = fetcher.awaitRequest();
+ assertNotNull(retryRequest);
+
+ fetcher.completeNext(Fetched.newBuilder()
+ .setResult(Fetched.Result.OK)
+ .build());
+
+ pipeline.close();
+ }
+
@Test
public void shouldCleanStaleSessionIdWhenFetchStateMissing() throws
Exception {
InboxFetcherRegistry registry = new InboxFetcherRegistry();