wolfstudy commented on a change in pull request #314:
URL: https://github.com/apache/pulsar-client-go/pull/314#discussion_r451932214



##########
File path: pulsar/consumer_test.go
##########
@@ -1342,3 +1344,130 @@ func TestProducerName(t *testing.T) {
                consumer.Ack(msg)
        }
 }
+
+type noopConsumerInterceptor struct{}
+
+func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {}
+
+func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs 
[]MessageID) {}
+
+// copyPropertyInterceptor copy all keys in message properties map and add a 
suffix
+type copyPropertyInterceptor struct {
+       suffix string
+}
+
+func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
+       properties := message.Properties()
+       copy := make(map[string]string, len(properties))
+       for k, v := range properties {
+               copy[k+x.suffix] = v
+       }
+       for ck, v := range copy {
+               properties[ck] = v
+       }
+}
+
+func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {}
+
+func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs 
[]MessageID) {}
+
+type metricConsumerInterceptor struct {
+       ackn  int32
+       nackn int32
+}
+
+func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {
+       atomic.AddInt32(&x.ackn, 1)
+}
+
+func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, 
msgIDs []MessageID) {
+       atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
+}
+
+func TestConsumerWithInterceptors(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       rand.Seed(time.Now().Unix())
+       topic := 
fmt.Sprintf("persistent://public/default/test-topic-interceptors-%d", 
rand.Int())
+       ctx := context.Background()
+
+       metric := &metricConsumerInterceptor{}
+
+       // create consumer
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:               topic,
+               SubscriptionName:    "my-sub",
+               Type:                Exclusive,
+               NackRedeliveryDelay: time.Second, // for testing nack
+               Interceptors: ConsumerInterceptors{
+                       noopConsumerInterceptor{},
+                       copyPropertyInterceptor{suffix: "-copy"},
+                       metric,
+               },
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               if _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       var nackIds []MessageID
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1":      "pulsar-1",
+                       "key-1-copy": "pulsar-1", // check properties copy by 
interceptor
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+
+               // ack message
+               if i%2 == 0 {
+                       consumer.Ack(msg)
+               } else {
+                       nackIds = append(nackIds, msg.ID())
+               }
+       }
+       assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn))
+
+       for i := range nackIds {
+               consumer.NackID(nackIds[i])
+       }
+
+       time.Sleep(time.Second * 3) // waiting for nack actual perform

Review comment:
       Please avoid using sleep in the test, because the status of each server 
is uncertain.

##########
File path: pulsar/consumer_test.go
##########
@@ -1342,3 +1344,130 @@ func TestProducerName(t *testing.T) {
                consumer.Ack(msg)
        }
 }
+
+type noopConsumerInterceptor struct{}
+
+func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {}
+
+func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs 
[]MessageID) {}
+
+// copyPropertyInterceptor copy all keys in message properties map and add a 
suffix
+type copyPropertyInterceptor struct {
+       suffix string
+}
+
+func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
+       properties := message.Properties()
+       copy := make(map[string]string, len(properties))
+       for k, v := range properties {
+               copy[k+x.suffix] = v
+       }
+       for ck, v := range copy {
+               properties[ck] = v
+       }
+}
+
+func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {}
+
+func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs 
[]MessageID) {}
+
+type metricConsumerInterceptor struct {
+       ackn  int32
+       nackn int32
+}
+
+func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {
+       atomic.AddInt32(&x.ackn, 1)
+}
+
+func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, 
msgIDs []MessageID) {
+       atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
+}
+
+func TestConsumerWithInterceptors(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       rand.Seed(time.Now().Unix())
+       topic := 
fmt.Sprintf("persistent://public/default/test-topic-interceptors-%d", 
rand.Int())

Review comment:
       Maybe we can use the `newTopicName()` to create a random topic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to