frankjkelly opened a new issue #233:
URL: https://github.com/apache/pulsar-client-go/issues/233


   #### Expected behavior
   
   Tell us what should happen:
   When creating a new topic - the consumer should be able to receive the first 
message
   
   #### Actual behavior
   
   Tell us what happens instead:
   For the first message(s) on a new topic - the consumer does not see them (or 
maybe the producer didn't actually send them). If we re-run the test with the 
same topic name now it works fine.
   
   #### Steps to reproduce
   
   How can we reproduce the issue 
   In the code below we are constantly changing the topic name and the Consumer 
just blocks forever waiting for a message. To see the change in behavior - keep 
the topic name constant and the second run passes once the topic was "created"
   
   ```
   func TestRawPulsar(t *testing.T) {
        signalId := "testTopic-" + strconv.FormatInt(time.Now().UnixNano(), 10)
        //To see the change in behavior when the topic is the same each time 
just use
        // the following line and run the test twice - first run will block 
forever, second run will work fine
        signalId = "testTopic1"
        readerClient, err := pulsar.NewClient(pulsar.ClientOptions{
                ConnectionTimeout: ConnectionTimeout,
                OperationTimeout:  OperationTimeout,
                URL:               "pulsar://localhost:6650",
        })
        if err != nil {
                t.Errorf("Error %v", err)
        }
        defer readerClient.Close()
        producer, err := readerClient.CreateProducer(pulsar.ProducerOptions{
                Topic: signalId,
        })
        if err != nil {
                t.Errorf("Error %v", err)
        }
        defer producer.Close()
   
        _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
                Payload: []byte("my message"),
        })
        if err != nil {
                t.Errorf("Error %v", err)
        }
        err = producer.Flush()
        if err != nil {
                t.Errorf("Error %v", err)
        }
   
        // PART 2
        writerClient, err := pulsar.NewClient(pulsar.ClientOptions{
                ConnectionTimeout: ConnectionTimeout,
                OperationTimeout:  OperationTimeout,
                URL:               "pulsar://localhost:6650",
        })
        if err != nil {
                t.Errorf("Error %v", err)
        }
        defer writerClient.Close()
        consumer, err := writerClient.Subscribe(pulsar.ConsumerOptions{
                Topic:            signalId,
                SubscriptionName: "Consumah",
                Type:             pulsar.Exclusive,
        })
        if err != nil {
                t.Errorf("Error %v", err)
        }
        defer consumer.Close()
   
        msg, err := consumer.Receive(context.Background())
        if err != nil {
                t.Errorf("Error %v", err)
        }
   
        log.Debugf("Received message msgId: %#v -- content: '%s'\n",
                msg.ID(), string(msg.Payload()))
        consumer.Ack(msg)
   }
   ```
   #### System configuration
   **Pulsar version**: 2.5.0 and 2.5.1
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to