aloyszhang opened a new issue #11612:
URL: https://github.com/apache/pulsar/issues/11612


   **Describe the bug**
   In our product environment,  sometimes set `startMessageId` for a 
reader(created by flink source) has no effect. 
   After troubleshoot, we found the root cause is the reader re-use the same 
subscriptionName exist before, and reader misuse the Durable Subscription 
object, so reader will read message from the Durable Subscription's 
`readPosition` instead of the `startMessageId`.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1.  build a topic and setup producer/consumer
   ```java
    String topicName = "t/ns/topic";
    Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName)
                   .create();
    Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
                   .readCompacted(true)
                   .subscriptionMode(SubscriptionMode.Durable)
                   .subscriptionType(SubscriptionType.Exclusive)
                   .subscriptionName("mix-subscription")
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .subscribe();
   
   ```
   2. send message
   ```java
           int messageNum = 10; 
           for (int i = 0; i < messageNum; i++) {
               producer.send("message" + i);
   ```
         
   3. receive the first 5 messages by this consumer and close
   ```java
   for (int i = 0; i < 5; i++) {
               Message<String> message = consumer.receive();
               assertNotNull(message);
               Assert.assertEquals(message.getValue(), "message" + i);
               consumer.acknowledge(message);
           }
           consumer.close();
   ```
   4. Setup a reader with the same subscription name and start from earliest
   ```java
   Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName)
                   .subscriptionName("mix-subscription")
                   .startMessageId(MessageId.earliest)
                   .create();
   ```
   5. reader message by this reader and only five message can be read
   ```java
    Message<String> message;
           int readCount = 0;
           while((message = reader.readNext(3, TimeUnit.SECONDS)) != null) {
               log.info("###Message id : {}", message.getMessageId());
               readCount ++;
           }
   Assert.assertEquals(readCount, 5);
   ```
   
   
   **Expected behavior**
   reader using the same subsciprioinName shoud be forbidden
   
   **Screenshots**
   If applicable, add screenshots to help explain your problem.
   
   **Desktop (please complete the following information):**
    - OS: [e.g. iOS]
   
   **Additional context**
   Add any other context about the problem here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to