YinY1 commented on issue #25145:
URL: https://github.com/apache/pulsar/issues/25145#issuecomment-3798085741

   @berg223 I set 
`subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)` (use your 
full code) and use following thread order and reproduced it:  
   ```java
           Thread producerThread = startThread(() -> producerTask(client));
           Thread consumerThread1 = startThread(() -> consumerTask(client, 
"sub-1"));
           Thread consumerThread2 = startThread(() -> consumerTask(client, 
"sub-2"));
   
           producerThread.join();
           consumerThread1.join();
           consumerThread2.join();
   ```
   
   Inconsistency found in turn 59:
   ```
   passed check iteration 58
   Starting producer task...
   Starting consumer task for subscription [sub-1]...
   Starting consumer task for subscription [sub-2]...
   Finished producer task.
   Finished consumer task for subscription [sub-2].
   Finished consumer task for subscription [sub-1].
   --- Total Acked Messages per Subscription ---
   Subscription [sub-1]: 1000000 acks
   Subscription [sub-2]: 999999 acks
   [9436:4283:2:905] not received from [sub-2]!
   Shutting down...
   Done.
   Inconsistency detected!
   ====== Iteration 1 failed with exit code 1 ======
   ```
   
   <details>
   <summary> Then I got internal-stat: </summary>
   
   ```
   $ pulsar-admin --admin-url http://pulsar-mini-proxy.pulsar:80 topics 
stats-internal persistent://public/default/partitioned_topic_25145-partition-2
   
   {
     "entriesAddedCounter" : 4300,
     "numberOfEntries" : 4300,
     "totalSize" : 132530392,
     "currentLedgerEntries" : 4300,
     "currentLedgerSize" : 132530392,
     "lastLedgerCreatedTimestamp" : "2026-01-26T02:23:49.505Z",
     "waitingCursorsCount" : 0,
     "pendingAddEntriesCount" : 0,
     "lastConfirmedEntry" : "9436:4299",
     "state" : "LedgerOpened",
     "ledgers" : [ {
       "ledgerId" : 9436,
       "entries" : 0,
       "size" : 0,
       "offloaded" : false,
       "underReplicated" : false
     } ],
     "cursors" : {
       "sub-2" : {
         "markDeletePosition" : "9436:4282",
         "readPosition" : "9436:4300",
         "waitingReadOp" : false,
         "pendingReadOps" : 0,
         "messagesConsumedCounter" : 4299,
         "cursorLedger" : 9458,
         "cursorLedgerLastEntry" : 243,
         "individuallyDeletedMessages" : "[(9436:4283..9436:4299]]",
         "lastLedgerSwitchTimestamp" : "2026-01-26T02:23:50.491Z",
         "state" : "Open",
         "active" : false,
         "numberOfEntriesSinceFirstNotAckedMessage" : 18,
         "totalNonContiguousDeletedMessagesRange" : 1,
         "subscriptionHavePendingRead" : false,
         "subscriptionHavePendingReplayRead" : false,
         "properties" : { }
       },
       "sub-1" : {
         "markDeletePosition" : "9436:4299",
         "readPosition" : "9436:4300",
         "waitingReadOp" : false,
         "pendingReadOps" : 0,
         "messagesConsumedCounter" : 4300,
         "cursorLedger" : 9468,
         "cursorLedgerLastEntry" : 246,
         "individuallyDeletedMessages" : "[]",
         "lastLedgerSwitchTimestamp" : "2026-01-26T02:23:50.49Z",
         "state" : "Open",
         "active" : false,
         "numberOfEntriesSinceFirstNotAckedMessage" : 1,
         "totalNonContiguousDeletedMessagesRange" : 0,
         "subscriptionHavePendingRead" : false,
         "subscriptionHavePendingReplayRead" : false,
         "properties" : { }
       }
     },
     "schemaLedgers" : [ {
       "ledgerId" : 9422,
       "entries" : 1,
       "size" : 119,
       "offloaded" : false,
       "underReplicated" : false
     } ],
     "compactedLedger" : {
       "ledgerId" : -1,
       "entries" : -1,
       "size" : -1,
       "offloaded" : false,
       "underReplicated" : false
     }
   }
   ```
   </details>
   
   So I changed to another order with a new topic: 
   ```java
               Thread producerThread = startThread(() -> producerTask(client));
               producerThread.join();
   
               Thread consumerThread1 = startThread(() -> consumerTask(client, 
"sub-1"));
               Thread consumerThread2 = startThread(() -> consumerTask(client, 
"sub-2"));
   
               // wait until all threads complete
               consumerThread1.join();
               consumerThread2.join();
   ```
   
   But I still reproduced the problem in turn 92:
   ```
   passed check iteration 91
   Starting producer task...
   Finished producer task.
   Starting consumer task for subscription [sub-1]...
   Starting consumer task for subscription [sub-2]...
   Finished consumer task for subscription [sub-2].
   Finished consumer task for subscription [sub-1].
   --- Total Acked Messages per Subscription ---
   Subscription [sub-1]: 1000000 acks
   Subscription [sub-2]: 999999 acks
   [9473:5795:7:941] not received from [sub-2]!
   Shutting down...
   Done.
   Inconsistency detected!
   ```
   <details>
   <summary> Internal stats is similar: </summary>
   
   ```
   $ pulsar-admin --admin-url http://pulsar-mini-proxy.pulsar:80 topics 
stats-internal persistent://public/default/partitioned_topic_25145_2-partition-7
   {
     "entriesAddedCounter" : 5798,
     "numberOfEntries" : 5798,
     "totalSize" : 178354015,
     "currentLedgerEntries" : 5798,
     "currentLedgerSize" : 178354015,
     "lastLedgerCreatedTimestamp" : "2026-01-26T03:38:11.802Z",
     "waitingCursorsCount" : 0,
     "pendingAddEntriesCount" : 0,
     "lastConfirmedEntry" : "9473:5797",
     "state" : "LedgerOpened",
     "ledgers" : [ {
       "ledgerId" : 9473,
       "entries" : 0,
       "size" : 0,
       "offloaded" : false,
       "underReplicated" : false
     } ],
     "cursors" : {
       "sub-2" : {
         "markDeletePosition" : "9473:5794",
         "readPosition" : "9473:5798",
         "waitingReadOp" : false,
         "pendingReadOps" : 0,
         "messagesConsumedCounter" : 5797,
         "cursorLedger" : 9504,
         "cursorLedgerLastEntry" : 220,
         "individuallyDeletedMessages" : "[(9473:5795..9473:5797]]",
         "lastLedgerSwitchTimestamp" : "2026-01-26T03:38:14.776Z",
         "state" : "Open",
         "active" : false,
         "numberOfEntriesSinceFirstNotAckedMessage" : 4,
         "totalNonContiguousDeletedMessagesRange" : 1,
         "subscriptionHavePendingRead" : false,
         "subscriptionHavePendingReplayRead" : false,
         "properties" : { }
       },
       "sub-1" : {
         "markDeletePosition" : "9473:5797",
         "readPosition" : "9473:5798",
         "waitingReadOp" : false,
         "pendingReadOps" : 0,
         "messagesConsumedCounter" : 5798,
         "cursorLedger" : 9506,
         "cursorLedgerLastEntry" : 231,
         "individuallyDeletedMessages" : "[]",
         "lastLedgerSwitchTimestamp" : "2026-01-26T03:38:14.777Z",
         "state" : "Open",
         "active" : false,
         "numberOfEntriesSinceFirstNotAckedMessage" : 1,
         "totalNonContiguousDeletedMessagesRange" : 0,
         "subscriptionHavePendingRead" : false,
         "subscriptionHavePendingReplayRead" : false,
         "properties" : { }
       }
     },
     "schemaLedgers" : [ {
       "ledgerId" : 9472,
       "entries" : 1,
       "size" : 121,
       "offloaded" : false,
       "underReplicated" : false
     } ],
     "compactedLedger" : {
       "ledgerId" : -1,
       "entries" : -1,
       "size" : -1,
       "offloaded" : false,
       "underReplicated" : false
     }
   }
   ```
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to