gaozhangmin opened a new issue #10167:
URL: https://github.com/apache/pulsar/issues/10167


   linked issue: #8115 
   Consumer could be stucked. after adding new consumer to the key_shared 
subscriptions.  same issue #8115 raised  before , be fixed in #10096 
   But after testing the code, We still got stucked.
   
   To Reproduce:
   1、firstly, create a new key_shared consumer 
   java -cp pulsar-pubsub-examples.jar \
   >     io.streamnative.examples.pubsub.SyncStringConsumerExample \
   >     --topic t7 \
   >     --subscription-name example-sub \
   >     --subscription-initial-position Earliest \
   >     --subscription-type Key_Shared \
   >     --num-messages 0
   
   2、produce data with two keys:
   java -cp pulsar-pubsub-examples.jar \
   >     io.streamnative.examples.pubsub.SyncStringProducerExample \
   >     --topic t7 \
   >     --num-keys 2 \
   >     --messages-rate 1 \
   >     --num-messages 100000000
   
   3、adding a new consumer:
   java -cp pulsar-pubsub-examples.jar \
   >     io.streamnative.examples.pubsub.SyncStringConsumerExample \
   >     --topic t7 \
   >     --subscription-name example-sub \
   >     --subscription-initial-position Earliest \
   >     --subscription-type Key_Shared \
   >     --num-messages 0
   
   the old consumer got stucked:
   
![image](https://user-images.githubusercontent.com/9278488/113959048-bcf85c00-9854-11eb-846f-556671b1c9ac.png)
   
   the new consumer received no message:
   
![image](https://user-images.githubusercontent.com/9278488/113959086-ced9ff00-9854-11eb-942f-3c7f9b7eaa48.png)
   
   stats of topic:
   ```
   {
     "msgRateIn" : 0.9999983401194219,
     "msgThroughputIn" : 66.99988878800126,
     "msgRateOut" : 0.0,
     "msgThroughputOut" : 0.0,
     "bytesInCounter" : 53295,
     "msgInCounter" : 799,
     "bytesOutCounter" : 835,
     "msgOutCounter" : 13,
     "averageMsgSize" : 66.99999999999999,
     "msgChunkPublished" : false,
     "storageSize" : 53295,
     "backlogSize" : 53295,
     "offloadedStorageSize" : 0,
     "publishers" : [ {
       "msgRateIn" : 0.9999983401194219,
       "msgThroughputIn" : 66.99988878800126,
       "averageMsgSize" : 67.0,
       "chunkedMessageRate" : 0.0,
       "producerId" : 0,
       "metadata" : { },
       "producerName" : "ys-pulsar-8-2",
       "connectedSince" : "2021-04-08T10:13:56.814+08:00",
       "clientVersion" : "2.7.1",
       "address" : "/10.89.133.11:24082"
     } ],
     "subscriptions" : {
       "example-sub" : {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "bytesOutCounter" : 835,
         "msgOutCounter" : 13,
         "msgRateRedeliver" : 0.0,
         "chuckedMessageRate" : 0,
         "msgBacklog" : 799,
         "backlogSize" : 0,
         "msgBacklogNoDelayed" : 799,
         "blockedSubscriptionOnUnackedMsgs" : false,
         "msgDelayed" : 0,
         "unackedMessages" : 13,
         "type" : "Key_Shared",
         "msgRateExpired" : 0.0,
         "totalMsgExpired" : 0,
         "lastExpireTimestamp" : 0,
         "lastConsumedFlowTimestamp" : 1617848047235,
         "lastConsumedTimestamp" : 1617848048048,
         "lastAckedTimestamp" : 0,
         "lastMarkDeleteAdvancedTimestamp" : 0,
         "consumers" : [ {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "bytesOutCounter" : 835,
           "msgOutCounter" : 13,
           "msgRateRedeliver" : 0.0,
           "chuckedMessageRate" : 0.0,
           "consumerName" : "5382f",
           "availablePermits" : 987,
           "unackedMessages" : 13,
           "avgMessagesPerEntry" : 255,
           "blockedConsumerOnUnackedMsgs" : false,
           "readPositionWhenJoining" : "133:0",
           "lastAckedTimestamp" : 0,
           "lastConsumedTimestamp" : 1617848048048,
           "keyHashRanges" : [ "[32769, 65536]" ],
           "metadata" : { },
           "connectedSince" : "2021-04-08T10:13:47.108+08:00",
           "clientVersion" : "2.7.1",
           "address" : "/10.89.133.11:24054"
         }, {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "bytesOutCounter" : 0,
           "msgOutCounter" : 0,
           "msgRateRedeliver" : 0.0,
           "chuckedMessageRate" : 0.0,
           "consumerName" : "f1013",
           "availablePermits" : 1000,
           "unackedMessages" : 0,
           "avgMessagesPerEntry" : 1000,
           "blockedConsumerOnUnackedMsgs" : false,
           "readPositionWhenJoining" : "133:12",
           "lastAckedTimestamp" : 0,
           "lastConsumedTimestamp" : 0,
           "keyHashRanges" : [ "[0, 32768]" ],
           "metadata" : { },
           "connectedSince" : "2021-04-08T10:14:07.231+08:00",
           "clientVersion" : "2.7.1",
           "address" : "/10.89.133.11:24112"
         } ],
         "isDurable" : true,
         "isReplicated" : false,
         "consumersAfterMarkDeletePosition" : {
           "f1013" : "133:12"
         },
         "nonContiguousDeletedMessagesRanges" : 0,
         "nonContiguousDeletedMessagesRangesSerializedSize" : 0
       }
     },
     "replication" : { },
     "deduplicationStatus" : "Disabled",
     "nonContiguousDeletedMessagesRanges" : 0,
     "nonContiguousDeletedMessagesRangesSerializedSize" : 0
   }
   ```
   
   stats-internal:
   ```
   {
     "entriesAddedCounter" : 841,
     "numberOfEntries" : 841,
     "totalSize" : 56109,
     "currentLedgerEntries" : 841,
     "currentLedgerSize" : 56109,
     "lastLedgerCreatedTimestamp" : "2021-04-08T10:13:47.087+08:00",
     "waitingCursorsCount" : 0,
     "pendingAddEntriesCount" : 0,
     "lastConfirmedEntry" : "133:840",
     "state" : "LedgerOpened",
     "ledgers" : [ {
       "ledgerId" : 133,
       "entries" : 0,
       "size" : 0,
       "offloaded" : false
     } ],
     "cursors" : {
       "example-sub" : {
         "markDeletePosition" : "133:-1",
         "readPosition" : "133:14",
         "waitingReadOp" : false,
         "pendingReadOps" : 0,
         "messagesConsumedCounter" : 0,
         "cursorLedger" : 135,
         "cursorLedgerLastEntry" : 0,
         "individuallyDeletedMessages" : "[]",
         "lastLedgerSwitchTimestamp" : "2021-04-08T10:13:47.097+08:00",
         "state" : "Open",
         "numberOfEntriesSinceFirstNotAckedMessage" : 15,
         "totalNonContiguousDeletedMessagesRange" : 0,
         "properties" : { }
       }
     },
     "schemaLedgers" : [ {
       "ledgerId" : 134,
       "entries" : 1,
       "size" : 98,
       "offloaded" : false
     } ],
     "compactedLedger" : {
       "ledgerId" : -1,
       "entries" : -1,
       "size" : -1,
       "offloaded" : false
     }
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to