This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ec846ff0 [fix] When topic is terminated. Client must not retry 
connecting. Pending messages should be failed (#1128)
ec846ff0 is described below

commit ec846ff012d425770e0a88755b0aa883ad63cbd8
Author: Prashant Kumar <65131575+pkumar-si...@users.noreply.github.com>
AuthorDate: Wed Nov 15 18:16:15 2023 -0800

    [fix] When topic is terminated. Client must not retry connecting. Pending 
messages should be failed (#1128)
    
    ### Motivation
    GoLang Pulsar client library has no support for Topic termination.
    When a topic is terminated following should happen at client library side.
    1. Producers should stop reconnecting. As once topic is terminated, it is 
permanent.
    2. All the pending messages should be failed.
    
    
    ### Modifications
    If reconnect is failing with TopicTerminated error.
    Run through the pending messages queue and complete the callback.
    After that exit the reconnect loop and set producer state as closing.
    Marking producer state producerClosing will ensure that new messages are 
immediately failed.
    
    
    Co-authored-by: Prashant Kumar <prasha...@splunk.com>
---
 pulsar/producer_partition.go | 21 +++++++++++++++++
 pulsar/producer_test.go      | 56 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 77 insertions(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f076eefe..65eef5b6 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -447,6 +447,27 @@ func (p *partitionProducer) reconnectToBroker() {
                        break
                }
 
+               if strings.Contains(errMsg, "TopicTerminatedError") {
+                       p.log.Info("Topic was terminated, failing pending 
messages, will not reconnect")
+                       pendingItems := p.pendingQueue.ReadableSlice()
+                       for _, item := range pendingItems {
+                               pi := item.(*pendingItem)
+                               if pi != nil {
+                                       pi.Lock()
+                                       requests := pi.sendRequests
+                                       for _, req := range requests {
+                                               sr := req.(*sendRequest)
+                                               if sr != nil {
+                                                       sr.done(nil, 
newError(TopicTerminated, err.Error()))
+                                               }
+                                       }
+                                       pi.Unlock()
+                               }
+                       }
+                       p.setProducerState(producerClosing)
+                       break
+               }
+
                if maxRetry > 0 {
                        maxRetry--
                }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 49e225f3..f30ae65f 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1158,6 +1158,62 @@ func TestFailedSchemaEncode(t *testing.T) {
        wg.Wait()
 }
 
+func TestTopicTermination(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "send_timeout_sub",
+       })
+       assert.Nil(t, err)
+       defer consumer.Close() // subscribe but do nothing
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:       topicName,
+               SendTimeout: 2 * time.Second,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       afterCh := time.After(5 * time.Second)
+       terminatedChan := make(chan bool)
+       go func() {
+               for {
+                       _, err := producer.Send(context.Background(), 
&ProducerMessage{
+                               Payload: make([]byte, 1024),
+                       })
+                       if err != nil {
+                               e := err.(*Error)
+                               if e.result == TopicTerminated {
+                                       terminatedChan <- true
+                               } else {
+                                       terminatedChan <- false
+                               }
+                       }
+                       time.Sleep(1 * time.Millisecond)
+               }
+       }()
+
+       terminateURL := adminURL + "/admin/v2/persistent/public/default/" + 
topicName + "/terminate"
+       log.Info(terminateURL)
+       makeHTTPCall(t, http.MethodPost, terminateURL, "")
+
+       for {
+               select {
+               case d := <-terminatedChan:
+                       assert.Equal(t, d, true)
+                       return
+               case <-afterCh:
+                       assert.Fail(t, "Time is up. Topic should have been 
terminated by now")
+               }
+       }
+}
+
 func TestSendTimeout(t *testing.T) {
        quotaURL := adminURL + 
"/admin/v2/namespaces/public/default/backlogQuota"
        quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`

Reply via email to