wolfstudy commented on a change in pull request #314: URL: https://github.com/apache/pulsar-client-go/pull/314#discussion_r451932214
########## File path: pulsar/consumer_test.go ########## @@ -1342,3 +1344,130 @@ func TestProducerName(t *testing.T) { consumer.Ack(msg) } } + +type noopConsumerInterceptor struct{} + +func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} + +func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {} + +func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} + +// copyPropertyInterceptor copy all keys in message properties map and add a suffix +type copyPropertyInterceptor struct { + suffix string +} + +func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) { + properties := message.Properties() + copy := make(map[string]string, len(properties)) + for k, v := range properties { + copy[k+x.suffix] = v + } + for ck, v := range copy { + properties[ck] = v + } +} + +func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {} + +func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} + +type metricConsumerInterceptor struct { + ackn int32 + nackn int32 +} + +func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} + +func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) { + atomic.AddInt32(&x.ackn, 1) +} + +func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) { + atomic.AddInt32(&x.nackn, int32(len(msgIDs))) +} + +func TestConsumerWithInterceptors(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + rand.Seed(time.Now().Unix()) + topic := fmt.Sprintf("persistent://public/default/test-topic-interceptors-%d", rand.Int()) + ctx := context.Background() + + metric := &metricConsumerInterceptor{} + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + NackRedeliveryDelay: time.Second, // for testing nack + Interceptors: ConsumerInterceptors{ + noopConsumerInterceptor{}, + copyPropertyInterceptor{suffix: "-copy"}, + metric, + }, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }); err != nil { + log.Fatal(err) + } + } + + var nackIds []MessageID + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + expectMsg := fmt.Sprintf("hello-%d", i) + expectProperties := map[string]string{ + "key-1": "pulsar-1", + "key-1-copy": "pulsar-1", // check properties copy by interceptor + } + assert.Equal(t, []byte(expectMsg), msg.Payload()) + assert.Equal(t, "pulsar", msg.Key()) + assert.Equal(t, expectProperties, msg.Properties()) + + // ack message + if i%2 == 0 { + consumer.Ack(msg) + } else { + nackIds = append(nackIds, msg.ID()) + } + } + assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn)) + + for i := range nackIds { + consumer.NackID(nackIds[i]) + } + + time.Sleep(time.Second * 3) // waiting for nack actual perform Review comment: Please avoid using sleep in the test, because the status of each server is uncertain. ########## File path: pulsar/consumer_test.go ########## @@ -1342,3 +1344,130 @@ func TestProducerName(t *testing.T) { consumer.Ack(msg) } } + +type noopConsumerInterceptor struct{} + +func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} + +func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {} + +func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} + +// copyPropertyInterceptor copy all keys in message properties map and add a suffix +type copyPropertyInterceptor struct { + suffix string +} + +func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) { + properties := message.Properties() + copy := make(map[string]string, len(properties)) + for k, v := range properties { + copy[k+x.suffix] = v + } + for ck, v := range copy { + properties[ck] = v + } +} + +func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {} + +func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} + +type metricConsumerInterceptor struct { + ackn int32 + nackn int32 +} + +func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} + +func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) { + atomic.AddInt32(&x.ackn, 1) +} + +func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) { + atomic.AddInt32(&x.nackn, int32(len(msgIDs))) +} + +func TestConsumerWithInterceptors(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + rand.Seed(time.Now().Unix()) + topic := fmt.Sprintf("persistent://public/default/test-topic-interceptors-%d", rand.Int()) Review comment: Maybe we can use the `newTopicName()` to create a random topic. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org