frankjkelly opened a new issue #233: URL: https://github.com/apache/pulsar-client-go/issues/233
#### Expected behavior Tell us what should happen: When creating a new topic - the consumer should be able to receive the first message #### Actual behavior Tell us what happens instead: For the first message(s) on a new topic - the consumer does not see them (or maybe the producer didn't actually send them). If we re-run the test with the same topic name now it works fine. #### Steps to reproduce How can we reproduce the issue In the code below we are constantly changing the topic name and the Consumer just blocks forever waiting for a message. To see the change in behavior - keep the topic name constant and the second run passes once the topic was "created" ``` func TestRawPulsar(t *testing.T) { signalId := "testTopic-" + strconv.FormatInt(time.Now().UnixNano(), 10) //To see the change in behavior when the topic is the same each time just use // the following line and run the test twice - first run will block forever, second run will work fine signalId = "testTopic1" readerClient, err := pulsar.NewClient(pulsar.ClientOptions{ ConnectionTimeout: ConnectionTimeout, OperationTimeout: OperationTimeout, URL: "pulsar://localhost:6650", }) if err != nil { t.Errorf("Error %v", err) } defer readerClient.Close() producer, err := readerClient.CreateProducer(pulsar.ProducerOptions{ Topic: signalId, }) if err != nil { t.Errorf("Error %v", err) } defer producer.Close() _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte("my message"), }) if err != nil { t.Errorf("Error %v", err) } err = producer.Flush() if err != nil { t.Errorf("Error %v", err) } // PART 2 writerClient, err := pulsar.NewClient(pulsar.ClientOptions{ ConnectionTimeout: ConnectionTimeout, OperationTimeout: OperationTimeout, URL: "pulsar://localhost:6650", }) if err != nil { t.Errorf("Error %v", err) } defer writerClient.Close() consumer, err := writerClient.Subscribe(pulsar.ConsumerOptions{ Topic: signalId, SubscriptionName: "Consumah", Type: pulsar.Exclusive, }) if err != nil { t.Errorf("Error %v", err) } defer consumer.Close() msg, err := consumer.Receive(context.Background()) if err != nil { t.Errorf("Error %v", err) } log.Debugf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } ``` #### System configuration **Pulsar version**: 2.5.0 and 2.5.1 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org