wangjia007bond commented on a change in pull request #168: 
[Issue-148][pulsar-client-go] add seek by messageID
URL: https://github.com/apache/pulsar-client-go/pull/168#discussion_r364548048
 
 

 ##########
 File path: pulsar/consumer_test.go
 ##########
 @@ -202,192 +157,160 @@ func TestConsumerWithInvalidConf(t *testing.T) {
 }
 
 func TestConsumerSubscriptionEarliestPosition(t *testing.T) {
-       client, err := NewClient(ClientOptions{
-               URL: lookupURL,
-       })
-
-       assert.Nil(t, err)
+       client := createClient(t)
        defer client.Close()
 
-       topicName := fmt.Sprintf("testSeek-%d", time.Now().Unix())
-       subName := "test-subscription-initial-earliest-position"
+       topicName := newTopicName()
 
-       // create producer
-       producer, err := client.CreateProducer(ProducerOptions{
+       producer := createProducer(t, client, ProducerOptions{
                Topic: topicName,
        })
-       assert.Nil(t, err)
        defer producer.Close()
 
-       // send message
-       ctx := context.Background()
-       _, err = producer.Send(ctx, &ProducerMessage{
-               Payload: []byte("msg-1-content-1"),
-       })
-       assert.Nil(t, err)
+       times := 2
+       msgPayload := "hello"
 
-       _, err = producer.Send(ctx, &ProducerMessage{
-               Payload: []byte("msg-1-content-2"),
+       ctx := context.Background()
+       // send before create consumer to simulate SubscriptionPositionEarliest
+       send(t, producer, ctx, times, &ProducerMessage{
+               Payload:             []byte(msgPayload),
        })
-       assert.Nil(t, err)
 
-       // create consumer
-       consumer, err := client.Subscribe(ConsumerOptions{
+       consumer := createConsumer(t, client, ConsumerOptions{
                Topic:                       topicName,
-               SubscriptionName:            subName,
+               SubscriptionName:            
"test-subscription-initial-earliest-position",
                SubscriptionInitialPosition: SubscriptionPositionEarliest,
        })
-       assert.Nil(t, err)
        defer consumer.Close()
 
-       msg, err := consumer.Receive(ctx)
-       assert.Nil(t, err)
-
-       assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
+       receive(t, consumer, ctx, times, func(t *testing.T, msg Message, i int) 
{
+               expectMsg := fmt.Sprintf("%s-%d", msgPayload, i)
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+       })
 }
 
 func TestConsumerKeyShared(t *testing.T) {
-       client, err := NewClient(ClientOptions{
-               URL: lookupURL,
-       })
-       assert.Nil(t, err)
+       client := createClient(t)
        defer client.Close()
 
-       topic := "persistent://public/default/test-topic-6"
+       topic := newTopicName()
 
-       consumer1, err := client.Subscribe(ConsumerOptions{
+       consumer1 := createConsumer(t, client, ConsumerOptions{
                Topic:            topic,
                SubscriptionName: "sub-1",
                Type:             KeyShared,
        })
-       assert.Nil(t, err)
        defer consumer1.Close()
 
-       consumer2, err := client.Subscribe(ConsumerOptions{
+       consumer2 := createConsumer(t, client, ConsumerOptions{
                Topic:            topic,
                SubscriptionName: "sub-1",
                Type:             KeyShared,
        })
-       assert.Nil(t, err)
        defer consumer2.Close()
 
-       // create producer
-       producer, err := client.CreateProducer(ProducerOptions{
+       producer := createProducer(t, client, ProducerOptions{
                Topic:           topic,
                DisableBatching: true,
        })
-       assert.Nil(t, err)
        defer producer.Close()
 
+       msgPayload := "hello"
+       msgKey := "pulsar"
+
        ctx := context.Background()
-       for i := 0; i < 10; i++ {
-               _, err := producer.Send(ctx, &ProducerMessage{
-                       Key:     fmt.Sprintf("key-shared-%d", i%3),
-                       Payload: []byte(fmt.Sprintf("value-%d", i)),
-               })
-               assert.Nil(t, err)
-       }
+       send(t, producer, ctx, 3, &ProducerMessage{
+               Payload:             []byte(fmt.Sprintf("%s-%s", msgPayload, 
"A")),
+               Key:                 fmt.Sprintf("%s-%s", msgKey, "A"),
+       })
+       send(t, producer, ctx, 3, &ProducerMessage{
+               Payload:             []byte(fmt.Sprintf("%s-%s", msgPayload, 
"B")),
+               Key:                 fmt.Sprintf("%s-%s", msgKey, "B"),
+       })
 
-       receivedConsumer1 := 0
-       receivedConsumer2 := 0
-       for (receivedConsumer1 + receivedConsumer2) < 10 {
-               select {
-               case cm, ok := <-consumer1.Chan():
-                       if !ok {
-                               break
-                       }
-                       receivedConsumer1++
-                       consumer1.Ack(cm.Message)
-               case cm, ok := <-consumer2.Chan():
-                       if !ok {
-                               break
-                       }
-                       receivedConsumer2++
-                       consumer2.Ack(cm.Message)
+       previousMsg, previousKey := "", ""
+       receive(t, consumer1, ctx, 3, func(t *testing.T, msg Message, i int) {
 
 Review comment:
   My purpose is to reduce duplicate code and unify test cases.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to