YinY1 opened a new issue, #25145:
URL: https://github.com/apache/pulsar/issues/25145

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   broker and client are in a same machine use same java
   
   -  OS: Linux version 5.4.241-1-tlinux4-0017.16 (mockbuild@VM-81-152-TS3) 
(gcc version 8.5.0 20210514 (Tencent 8.5.0-23) (GCC))  SMP Thu Dec 12 21:16:37 
CST 2024
   - Broker version: 4.1.1
   - Client version:  4.1.1 (Java)
   - Java version:
      - openjdk version "24.0.2" 2025-07-15
      - OpenJDK Runtime Environment Temurin-24.0.2+12 (build 24.0.2+12)
      - OpenJDK 64-Bit Server VM Temurin-24.0.2+12 (build 24.0.2+12, mixed 
mode, sharing) 
   
   ### Issue Description
   
   # What happened
   ## What I did
   Continuously produced a certain amount of  messages to a **partitioned 
topic**, and consumed those messages with **two subscriptions [sub-1, sub-2]** 
*individually*, and check their consistency. Consistent means a message has 
been sent and **received** from all subscriptions, so it will be checked after 
received and **before acknowledged**.
   options i used (use default if not mentioned)
   - producer
     - batchingMaxMessagess = 1000
   - consumer
     - enableBatchIndexAcknowledgment = true
     - ack mode = Individual
     - acknowledgmentGroupTime = 100ms
     - maxAcknowledgmentGroupSize = 1000
   
   
   ## Expected
   All messages produced could be received from all subscriptions.
   If produced 1000000, I should get
   ```
   --- Total Acked Messages per Subscription ---
   2026-01-14 11:56:07.351 [main] INFO  org.example.Main - Subscription 
[sub-2]: 1000000 acks
   2026-01-14 11:56:07.351 [main] INFO  org.example.Main - Subscription 
[sub-1]: 1000000 acks 
   ```
   ## Happened
   **Occasionally**, missed one message with a random batch index
   ```
   --- Total Acked Messages per Subscription ---
   2026-01-14 11:56:07.351 [main] INFO  org.example.Main - Subscription 
[sub-2]: 999999 acks
   2026-01-14 11:56:07.351 [main] INFO  org.example.Main - Subscription 
[sub-1]: 1000000 acks 
   2026-01-14 11:56:07.351 [main] ERROR org.example.Main - [0:123:0:45] not 
received from [sub-2]!
   ```
   # Why a bug
   If set ```enableBatchIndexAcknowledgment = true``` or set 
```acknowledgmentGroupTime = 0```,
   this will not occur.
   And broker has a **backlog** pointed to these entry, while 
```consumer.receive()``` could not get that missing message even after calling 
```redeliverUnacknowledgedMessages()```
   
   ### Error messages
   
   ```text
   if enable DEBUG for 
.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker, found
   
   Flushing pending acks to broker: last-cumulative-ack: [] -- individual-acks: 
[] -- individual-batch-index-acks: [(0, 123, {})]
   ```
   
   ### Reproducing the issue
   
   pseudocode: 
   
   ```java
   public class PulsarBatchAckPseudoDemo {
   
       static Set<MessageId> sentMessageIds = new ConcurrentHashMap.newKeySet();
       static Map<MessageId, Set<String>> receiptTracker = new 
ConcurrentHashMap<>();
       static Map<String, AtomicLong> ackCounters = Map.of("sub-1", new 
AtomicLong(0), "sub-2", new AtomicLong(0));
   
       public static void main(String[] args) {
   
           // Assume a partitioned topic is already created.
           PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
   
           // Run producer and consumers concurrently in background threads.
           startThread(() -> producerTask(client));
           startThread(() -> consumerTask(client, "sub-1"));
           startThread(() -> consumerTask(client, "sub-2"));
   
           // Let the test run for a while, then stop threads and verify.
           sleep(90_000); // Wait for a generous amount of time.
   
           // --- Verification ---
           System.out.println("--- Total Acked Messages per Subscription ---");
           System.out.printf("Subscription [sub-1]: %d acks%n", 
ackCounters.get("sub-1").get());
           System.out.printf("Subscription [sub-2]: %d acks%n", 
ackCounters.get("sub-2").get());
   
           // Find and report any message that wasn't received by BOTH 
subscriptions.
           for (MessageId sentId : sentMessageIds) {
               Set<String> receivedBy = receiptTracker.getOrDefault(sentId, 
Collections.emptySet());
               if (receivedBy.size() < 2) {
                   if (!receivedBy.contains("sub-1")) {
                       System.err.printf("[%s] not received from [sub-1]!%n", 
sentId);
                   }
                   if (!receivedBy.contains("sub-2")) {
                       System.err.printf("[%s] not received from [sub-2]!%n", 
sentId);
                   }
               }
           }
       }
   
       // --- Producer Task Logic ---
       @SneakyThrows
       static void producerTask(PulsarClient client) {
           Producer producer = 
client.newProducer().topic("partitioned_topic").batchingMaxMessages(1000).create();
   
           for (int i = 0; i < 1_000_000; i++) {
               // Asynchronously send and register the MessageId upon 
completion.
               producer.sendAsync("message-payload-" + i).thenAccept(messageId 
-> {
                   sentMessageIds.add(messageId);
                   receiptTracker.put(messageId, ConcurrentHashMap.newKeySet());
               });
           }
       }
   
       // --- Consumer Task Logic ---
       @SneakyThrows
       static void consumerTask(PulsarClient client, String subscriptionName) {
           Consumer consumer = 
client.newConsumer().topic("partitioned_topic").subscriptionName(subscriptionName)
                   
.enableBatchIndexAcknowledgement(true).acknowledgmentGroupTime(100, 
TimeUnit.MILLISECONDS)
                   .acknowledgementGroupSize(1000).subscribe();
   
           while (true) { // Run until the main thread stops it.
               Message<String> message = consumer.receive();
   
               // Step 1: Mark as received BEFORE acknowledging.
               Set<String> receipts = 
receiptTracker.get(message.getMessageId());
               if (receipts != null) {
                   receipts.add(subscriptionName);
               }
   
               // Step 2: Increment the counter for the final report.
               ackCounters.get(subscriptionName).incrementAndGet();
   
               // Step 3: Acknowledge the message individually.
               consumer.acknowledge(message);
           }
       }
   }
   ```
   
   ### Additional information
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to