YinY1 opened a new issue, #25145: URL: https://github.com/apache/pulsar/issues/25145
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment broker and client are in a same machine use same java - OS: Linux version 5.4.241-1-tlinux4-0017.16 (mockbuild@VM-81-152-TS3) (gcc version 8.5.0 20210514 (Tencent 8.5.0-23) (GCC)) SMP Thu Dec 12 21:16:37 CST 2024 - Broker version: 4.1.1 - Client version: 4.1.1 (Java) - Java version: - openjdk version "24.0.2" 2025-07-15 - OpenJDK Runtime Environment Temurin-24.0.2+12 (build 24.0.2+12) - OpenJDK 64-Bit Server VM Temurin-24.0.2+12 (build 24.0.2+12, mixed mode, sharing) ### Issue Description # What happened ## What I did Continuously produced a certain amount of messages to a **partitioned topic**, and consumed those messages with **two subscriptions [sub-1, sub-2]** *individually*, and check their consistency. Consistent means a message has been sent and **received** from all subscriptions, so it will be checked after received and **before acknowledged**. options i used (use default if not mentioned) - producer - batchingMaxMessagess = 1000 - consumer - enableBatchIndexAcknowledgment = true - ack mode = Individual - acknowledgmentGroupTime = 100ms - maxAcknowledgmentGroupSize = 1000 ## Expected All messages produced could be received from all subscriptions. If produced 1000000, I should get ``` --- Total Acked Messages per Subscription --- 2026-01-14 11:56:07.351 [main] INFO org.example.Main - Subscription [sub-2]: 1000000 acks 2026-01-14 11:56:07.351 [main] INFO org.example.Main - Subscription [sub-1]: 1000000 acks ``` ## Happened **Occasionally**, missed one message with a random batch index ``` --- Total Acked Messages per Subscription --- 2026-01-14 11:56:07.351 [main] INFO org.example.Main - Subscription [sub-2]: 999999 acks 2026-01-14 11:56:07.351 [main] INFO org.example.Main - Subscription [sub-1]: 1000000 acks 2026-01-14 11:56:07.351 [main] ERROR org.example.Main - [0:123:0:45] not received from [sub-2]! ``` # Why a bug If set ```enableBatchIndexAcknowledgment = true``` or set ```acknowledgmentGroupTime = 0```, this will not occur. And broker has a **backlog** pointed to these entry, while ```consumer.receive()``` could not get that missing message even after calling ```redeliverUnacknowledgedMessages()``` ### Error messages ```text if enable DEBUG for .apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker, found Flushing pending acks to broker: last-cumulative-ack: [] -- individual-acks: [] -- individual-batch-index-acks: [(0, 123, {})] ``` ### Reproducing the issue pseudocode: ```java public class PulsarBatchAckPseudoDemo { static Set<MessageId> sentMessageIds = new ConcurrentHashMap.newKeySet(); static Map<MessageId, Set<String>> receiptTracker = new ConcurrentHashMap<>(); static Map<String, AtomicLong> ackCounters = Map.of("sub-1", new AtomicLong(0), "sub-2", new AtomicLong(0)); public static void main(String[] args) { // Assume a partitioned topic is already created. PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build(); // Run producer and consumers concurrently in background threads. startThread(() -> producerTask(client)); startThread(() -> consumerTask(client, "sub-1")); startThread(() -> consumerTask(client, "sub-2")); // Let the test run for a while, then stop threads and verify. sleep(90_000); // Wait for a generous amount of time. // --- Verification --- System.out.println("--- Total Acked Messages per Subscription ---"); System.out.printf("Subscription [sub-1]: %d acks%n", ackCounters.get("sub-1").get()); System.out.printf("Subscription [sub-2]: %d acks%n", ackCounters.get("sub-2").get()); // Find and report any message that wasn't received by BOTH subscriptions. for (MessageId sentId : sentMessageIds) { Set<String> receivedBy = receiptTracker.getOrDefault(sentId, Collections.emptySet()); if (receivedBy.size() < 2) { if (!receivedBy.contains("sub-1")) { System.err.printf("[%s] not received from [sub-1]!%n", sentId); } if (!receivedBy.contains("sub-2")) { System.err.printf("[%s] not received from [sub-2]!%n", sentId); } } } } // --- Producer Task Logic --- @SneakyThrows static void producerTask(PulsarClient client) { Producer producer = client.newProducer().topic("partitioned_topic").batchingMaxMessages(1000).create(); for (int i = 0; i < 1_000_000; i++) { // Asynchronously send and register the MessageId upon completion. producer.sendAsync("message-payload-" + i).thenAccept(messageId -> { sentMessageIds.add(messageId); receiptTracker.put(messageId, ConcurrentHashMap.newKeySet()); }); } } // --- Consumer Task Logic --- @SneakyThrows static void consumerTask(PulsarClient client, String subscriptionName) { Consumer consumer = client.newConsumer().topic("partitioned_topic").subscriptionName(subscriptionName) .enableBatchIndexAcknowledgement(true).acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS) .acknowledgementGroupSize(1000).subscribe(); while (true) { // Run until the main thread stops it. Message<String> message = consumer.receive(); // Step 1: Mark as received BEFORE acknowledging. Set<String> receipts = receiptTracker.get(message.getMessageId()); if (receipts != null) { receipts.add(subscriptionName); } // Step 2: Increment the counter for the final report. ackCounters.get(subscriptionName).incrementAndGet(); // Step 3: Acknowledge the message individually. consumer.acknowledge(message); } } } ``` ### Additional information _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
