This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ac85b65  Fix Go Producer produce to only one partition (#205)
ac85b65 is described below

commit ac85b657a850a9c124bea9f4759cfc59e7559ebc
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Mar 23 15:58:22 2020 -0700

    Fix Go Producer produce to only one partition (#205)
    
    * Fix Go Producer produce to only one partition
---
 pulsar/consumer_test.go                |  5 +--
 pulsar/internal/default_router.go      | 12 ++++--
 pulsar/internal/default_router_test.go | 24 ++++++++++-
 pulsar/producer_impl.go                | 12 +++++-
 pulsar/producer_partition.go           |  2 -
 pulsar/producer_test.go                | 73 ++++++++++++++++++++++++++++++++--
 pulsar/test_helper.go                  |  4 +-
 7 files changed, 115 insertions(+), 17 deletions(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 488ca59..05fab87 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -21,7 +21,6 @@ import (
        "context"
        "fmt"
        "log"
-       "net/http"
        "testing"
        "time"
 
@@ -327,7 +326,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
        topic := "persistent://public/default/testGetPartitions"
        testURL := adminURL + "/" + 
"admin/v2/persistent/public/default/testGetPartitions/partitions"
 
-       makeHTTPCall(t, http.MethodPut, testURL, "64")
+       makeHTTPCall(t, testURL, "64")
 
        // create producer
        producer, err := client.CreateProducer(ProducerOptions{
@@ -411,7 +410,7 @@ func TestConsumerShared(t *testing.T) {
        topic := "persistent://public/default/testMultiPartitionConsumerShared"
        testURL := adminURL + "/" + 
"admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"
 
-       makeHTTPCall(t, http.MethodPut, testURL, "3")
+       makeHTTPCall(t, testURL, "3")
 
        sub := "sub-shared-1"
        consumer1, err := client.Subscribe(ConsumerOptions{
diff --git a/pulsar/internal/default_router.go 
b/pulsar/internal/default_router.go
index e4d31ca..2cf4a90 100644
--- a/pulsar/internal/default_router.go
+++ b/pulsar/internal/default_router.go
@@ -19,6 +19,7 @@ package internal
 
 import (
        "math/rand"
+       "sync/atomic"
        "time"
 )
 
@@ -27,6 +28,7 @@ type defaultRouter struct {
        shiftIdx         uint32
        maxBatchingDelay time.Duration
        hashFunc         func(string) uint32
+       msgCounter       uint32
 }
 
 type Clock func() uint64
@@ -41,12 +43,13 @@ func NewSystemClock() Clock {
 // NewDefaultRouter set the message routing mode for the partitioned producer.
 // Default routing mode is round-robin routing.
 func NewDefaultRouter(clock Clock, hashFunc func(string) uint32,
-       maxBatchingDelay time.Duration) func(string, uint32) int {
+       maxBatchingDelay time.Duration, disableBatching bool) func(string, 
uint32) int {
        state := &defaultRouter{
                clock:            clock,
                shiftIdx:         rand.Uint32(),
                maxBatchingDelay: maxBatchingDelay,
                hashFunc:         hashFunc,
+               msgCounter:       0,
        }
 
        return func(key string, numPartitions uint32) int {
@@ -65,10 +68,13 @@ func NewDefaultRouter(clock Clock, hashFunc func(string) 
uint32,
                // of batching of the messages.
                //
                //currentMs / maxBatchingDelayMs + startPtnIdx
-               if maxBatchingDelay.Nanoseconds() != 0 {
+               if !disableBatching && maxBatchingDelay.Nanoseconds() > 0 {
                        n := 
uint32(state.clock()/uint64(maxBatchingDelay.Nanoseconds())) + state.shiftIdx
                        return int(n % numPartitions)
                }
-               return 0
+
+               p := int(state.msgCounter % numPartitions)
+               atomic.AddUint32(&state.msgCounter, 1)
+               return p
        }
 }
diff --git a/pulsar/internal/default_router_test.go 
b/pulsar/internal/default_router_test.go
index 7dc3e0b..980df92 100644
--- a/pulsar/internal/default_router_test.go
+++ b/pulsar/internal/default_router_test.go
@@ -30,7 +30,7 @@ func TestDefaultRouter(t *testing.T) {
 
        router := NewDefaultRouter(func() uint64 {
                return currentClock
-       }, JavaStringHash, 10*time.Nanosecond)
+       }, JavaStringHash, 10*time.Nanosecond, false)
 
        // partition index should not change with time
        p1 := router("my-key", 100)
@@ -63,11 +63,31 @@ func TestDefaultRouter(t *testing.T) {
        currentClock = 112
        pr6 := router("", 100)
        assert.Equal(t, pr5, pr6)
+
+       // test batching delay is 0
+       router = NewDefaultRouter(func() uint64 {
+               return currentClock
+       }, JavaStringHash, 0, true)
+
+       // should round robin partitions
+       for i := 0; i < 200; i++ {
+               assert.Equal(t, router("", 100), i%100)
+       }
+
+       // test batching is disabled
+       router = NewDefaultRouter(func() uint64 {
+               return currentClock
+       }, JavaStringHash, 10*time.Nanosecond, true)
+
+       // should round robin partitions
+       for i := 0; i < 200; i++ {
+               assert.Equal(t, router("", 100), i%100)
+       }
 }
 
 func TestDefaultRouterNoPartitions(t *testing.T) {
 
-       router := NewDefaultRouter(NewSystemClock(), JavaStringHash, 
10*time.Nanosecond)
+       router := NewDefaultRouter(NewSystemClock(), JavaStringHash, 
10*time.Nanosecond, false)
 
        // partition index should not change with time
        p1 := router("", 1)
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 83a6f75..67d0360 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "context"
+       "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
 )
@@ -30,6 +31,8 @@ type producer struct {
        messageRouter func(*ProducerMessage, TopicMetadata) int
 }
 
+const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
+
 func getHashingFunction(s HashingScheme) func(string) uint32 {
        switch s {
        case JavaStringHash:
@@ -51,11 +54,18 @@ func newProducer(client *client, options *ProducerOptions) 
(*producer, error) {
                client: client,
        }
 
+       var batchingMaxPublishDelay time.Duration
+       if options.BatchingMaxPublishDelay != 0 {
+               batchingMaxPublishDelay = options.BatchingMaxPublishDelay
+       } else {
+               batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
+       }
+
        if options.MessageRouter == nil {
                internalRouter := internal.NewDefaultRouter(
                        internal.NewSystemClock(),
                        getHashingFunction(options.HashingScheme),
-                       options.BatchingMaxPublishDelay)
+                       batchingMaxPublishDelay, options.DisableBatching)
                p.messageRouter = func(message *ProducerMessage, metadata 
TopicMetadata) int {
                        return internalRouter(message.Key, 
metadata.NumPartitions())
                }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f10adcf..58f512f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -64,8 +64,6 @@ type partitionProducer struct {
        partitionIdx int
 }
 
-const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
-
 func newPartitionProducer(client *client, topic string, options 
*ProducerOptions, partitionIdx int) (
        *partitionProducer, error) {
        var batchingMaxPublishDelay time.Duration
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 66fa8a0..50eea77 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -20,7 +20,7 @@ package pulsar
 import (
        "context"
        "fmt"
-       "net/http"
+       "strconv"
        "sync"
        "testing"
        "time"
@@ -351,11 +351,11 @@ func TestFlushInProducer(t *testing.T) {
 }
 
 func TestFlushInPartitionedProducer(t *testing.T) {
-       topicName := 
"persistent://public/default/partition-testFlushInPartitionedProducer"
+       topicName := "public/default/partition-testFlushInPartitionedProducer"
 
        // call admin api to make it partitioned
-       url := adminURL + "/" + "admin/v2/" + topicName + "/partitions"
-       makeHTTPCall(t, http.MethodPut, url, "5")
+       url := adminURL + "/" + "admin/v2/persistent/" + topicName + 
"/partitions"
+       makeHTTPCall(t, url, "5")
 
        numberOfPartitions := 5
        numOfMessages := 10
@@ -427,6 +427,71 @@ func TestFlushInPartitionedProducer(t *testing.T) {
        assert.Equal(t, msgCount, numOfMessages/2)
 }
 
+func TestRoundRobinRouterPartitionedProducer(t *testing.T) {
+       topicName := 
"public/default/partition-testRoundRobinRouterPartitionedProducer"
+       numberOfPartitions := 5
+
+       // call admin api to make it partitioned
+       url := adminURL + "/" + "admin/v2/persistent/" + topicName + 
"/partitions"
+       makeHTTPCall(t, url, strconv.Itoa(numberOfPartitions))
+
+       numOfMessages := 10
+       ctx := context.Background()
+
+       // creat client connection
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       // create consumer
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "my-sub",
+               Type:             Exclusive,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 5 messages
+       prefix := "msg-"
+
+       for i := 0; i < numOfMessages; i++ {
+               messageContent := prefix + fmt.Sprintf("%d", i)
+               _, err = producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(messageContent),
+               })
+               assert.Nil(t, err)
+       }
+
+       // Receive all messages
+       msgCount := 0
+       msgPartitionMap := make(map[string]int)
+       for i := 0; i < numOfMessages; i++ {
+               msg, err := consumer.Receive(ctx)
+               fmt.Printf("Received message msgId: %#v topic: %s-- content: 
'%s'\n",
+                       msg.ID(), msg.Topic(), string(msg.Payload()))
+               assert.Nil(t, err)
+               consumer.Ack(msg)
+               msgCount++
+               msgPartitionMap[msg.Topic()]++
+       }
+       assert.Equal(t, msgCount, numOfMessages)
+       assert.Equal(t, numberOfPartitions, len(msgPartitionMap))
+       for _, count := range msgPartitionMap {
+               assert.Equal(t, count, numOfMessages/numberOfPartitions)
+       }
+}
+
 func TestMessageRouter(t *testing.T) {
        // Create topic with 5 partitions
        err := 
httpPut("admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 3471bca..9d54f20 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -118,10 +118,10 @@ func httpDo(method string, requestPath string, in 
interface{}, out interface{})
        return nil
 }
 
-func makeHTTPCall(t *testing.T, method string, url string, body string) {
+func makeHTTPCall(t *testing.T, url string, body string) {
        client := http.Client{}
 
-       req, err := http.NewRequest(method, url, strings.NewReader(body))
+       req, err := http.NewRequest(http.MethodPut, url, 
strings.NewReader(body))
        if err != nil {
                t.Fatal(err)
        }

Reply via email to