This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new ec846ff0 [fix] When topic is terminated. Client must not retry connecting. Pending messages should be failed (#1128) ec846ff0 is described below commit ec846ff012d425770e0a88755b0aa883ad63cbd8 Author: Prashant Kumar <65131575+pkumar-si...@users.noreply.github.com> AuthorDate: Wed Nov 15 18:16:15 2023 -0800 [fix] When topic is terminated. Client must not retry connecting. Pending messages should be failed (#1128) ### Motivation GoLang Pulsar client library has no support for Topic termination. When a topic is terminated following should happen at client library side. 1. Producers should stop reconnecting. As once topic is terminated, it is permanent. 2. All the pending messages should be failed. ### Modifications If reconnect is failing with TopicTerminated error. Run through the pending messages queue and complete the callback. After that exit the reconnect loop and set producer state as closing. Marking producer state producerClosing will ensure that new messages are immediately failed. Co-authored-by: Prashant Kumar <prasha...@splunk.com> --- pulsar/producer_partition.go | 21 +++++++++++++++++ pulsar/producer_test.go | 56 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index f076eefe..65eef5b6 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -447,6 +447,27 @@ func (p *partitionProducer) reconnectToBroker() { break } + if strings.Contains(errMsg, "TopicTerminatedError") { + p.log.Info("Topic was terminated, failing pending messages, will not reconnect") + pendingItems := p.pendingQueue.ReadableSlice() + for _, item := range pendingItems { + pi := item.(*pendingItem) + if pi != nil { + pi.Lock() + requests := pi.sendRequests + for _, req := range requests { + sr := req.(*sendRequest) + if sr != nil { + sr.done(nil, newError(TopicTerminated, err.Error())) + } + } + pi.Unlock() + } + } + p.setProducerState(producerClosing) + break + } + if maxRetry > 0 { maxRetry-- } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 49e225f3..f30ae65f 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1158,6 +1158,62 @@ func TestFailedSchemaEncode(t *testing.T) { wg.Wait() } +func TestTopicTermination(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "send_timeout_sub", + }) + assert.Nil(t, err) + defer consumer.Close() // subscribe but do nothing + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + SendTimeout: 2 * time.Second, + }) + assert.Nil(t, err) + defer producer.Close() + + afterCh := time.After(5 * time.Second) + terminatedChan := make(chan bool) + go func() { + for { + _, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 1024), + }) + if err != nil { + e := err.(*Error) + if e.result == TopicTerminated { + terminatedChan <- true + } else { + terminatedChan <- false + } + } + time.Sleep(1 * time.Millisecond) + } + }() + + terminateURL := adminURL + "/admin/v2/persistent/public/default/" + topicName + "/terminate" + log.Info(terminateURL) + makeHTTPCall(t, http.MethodPost, terminateURL, "") + + for { + select { + case d := <-terminatedChan: + assert.Equal(t, d, true) + return + case <-afterCh: + assert.Fail(t, "Time is up. Topic should have been terminated by now") + } + } +} + func TestSendTimeout(t *testing.T) { quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota" quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`