This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new ac85b65 Fix Go Producer produce to only one partition (#205) ac85b65 is described below commit ac85b657a850a9c124bea9f4759cfc59e7559ebc Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Mar 23 15:58:22 2020 -0700 Fix Go Producer produce to only one partition (#205) * Fix Go Producer produce to only one partition --- pulsar/consumer_test.go | 5 +-- pulsar/internal/default_router.go | 12 ++++-- pulsar/internal/default_router_test.go | 24 ++++++++++- pulsar/producer_impl.go | 12 +++++- pulsar/producer_partition.go | 2 - pulsar/producer_test.go | 73 ++++++++++++++++++++++++++++++++-- pulsar/test_helper.go | 4 +- 7 files changed, 115 insertions(+), 17 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 488ca59..05fab87 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "log" - "net/http" "testing" "time" @@ -327,7 +326,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { topic := "persistent://public/default/testGetPartitions" testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions" - makeHTTPCall(t, http.MethodPut, testURL, "64") + makeHTTPCall(t, testURL, "64") // create producer producer, err := client.CreateProducer(ProducerOptions{ @@ -411,7 +410,7 @@ func TestConsumerShared(t *testing.T) { topic := "persistent://public/default/testMultiPartitionConsumerShared" testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions" - makeHTTPCall(t, http.MethodPut, testURL, "3") + makeHTTPCall(t, testURL, "3") sub := "sub-shared-1" consumer1, err := client.Subscribe(ConsumerOptions{ diff --git a/pulsar/internal/default_router.go b/pulsar/internal/default_router.go index e4d31ca..2cf4a90 100644 --- a/pulsar/internal/default_router.go +++ b/pulsar/internal/default_router.go @@ -19,6 +19,7 @@ package internal import ( "math/rand" + "sync/atomic" "time" ) @@ -27,6 +28,7 @@ type defaultRouter struct { shiftIdx uint32 maxBatchingDelay time.Duration hashFunc func(string) uint32 + msgCounter uint32 } type Clock func() uint64 @@ -41,12 +43,13 @@ func NewSystemClock() Clock { // NewDefaultRouter set the message routing mode for the partitioned producer. // Default routing mode is round-robin routing. func NewDefaultRouter(clock Clock, hashFunc func(string) uint32, - maxBatchingDelay time.Duration) func(string, uint32) int { + maxBatchingDelay time.Duration, disableBatching bool) func(string, uint32) int { state := &defaultRouter{ clock: clock, shiftIdx: rand.Uint32(), maxBatchingDelay: maxBatchingDelay, hashFunc: hashFunc, + msgCounter: 0, } return func(key string, numPartitions uint32) int { @@ -65,10 +68,13 @@ func NewDefaultRouter(clock Clock, hashFunc func(string) uint32, // of batching of the messages. // //currentMs / maxBatchingDelayMs + startPtnIdx - if maxBatchingDelay.Nanoseconds() != 0 { + if !disableBatching && maxBatchingDelay.Nanoseconds() > 0 { n := uint32(state.clock()/uint64(maxBatchingDelay.Nanoseconds())) + state.shiftIdx return int(n % numPartitions) } - return 0 + + p := int(state.msgCounter % numPartitions) + atomic.AddUint32(&state.msgCounter, 1) + return p } } diff --git a/pulsar/internal/default_router_test.go b/pulsar/internal/default_router_test.go index 7dc3e0b..980df92 100644 --- a/pulsar/internal/default_router_test.go +++ b/pulsar/internal/default_router_test.go @@ -30,7 +30,7 @@ func TestDefaultRouter(t *testing.T) { router := NewDefaultRouter(func() uint64 { return currentClock - }, JavaStringHash, 10*time.Nanosecond) + }, JavaStringHash, 10*time.Nanosecond, false) // partition index should not change with time p1 := router("my-key", 100) @@ -63,11 +63,31 @@ func TestDefaultRouter(t *testing.T) { currentClock = 112 pr6 := router("", 100) assert.Equal(t, pr5, pr6) + + // test batching delay is 0 + router = NewDefaultRouter(func() uint64 { + return currentClock + }, JavaStringHash, 0, true) + + // should round robin partitions + for i := 0; i < 200; i++ { + assert.Equal(t, router("", 100), i%100) + } + + // test batching is disabled + router = NewDefaultRouter(func() uint64 { + return currentClock + }, JavaStringHash, 10*time.Nanosecond, true) + + // should round robin partitions + for i := 0; i < 200; i++ { + assert.Equal(t, router("", 100), i%100) + } } func TestDefaultRouterNoPartitions(t *testing.T) { - router := NewDefaultRouter(NewSystemClock(), JavaStringHash, 10*time.Nanosecond) + router := NewDefaultRouter(NewSystemClock(), JavaStringHash, 10*time.Nanosecond, false) // partition index should not change with time p1 := router("", 1) diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 83a6f75..67d0360 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "time" "github.com/apache/pulsar-client-go/pulsar/internal" ) @@ -30,6 +31,8 @@ type producer struct { messageRouter func(*ProducerMessage, TopicMetadata) int } +const defaultBatchingMaxPublishDelay = 10 * time.Millisecond + func getHashingFunction(s HashingScheme) func(string) uint32 { switch s { case JavaStringHash: @@ -51,11 +54,18 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { client: client, } + var batchingMaxPublishDelay time.Duration + if options.BatchingMaxPublishDelay != 0 { + batchingMaxPublishDelay = options.BatchingMaxPublishDelay + } else { + batchingMaxPublishDelay = defaultBatchingMaxPublishDelay + } + if options.MessageRouter == nil { internalRouter := internal.NewDefaultRouter( internal.NewSystemClock(), getHashingFunction(options.HashingScheme), - options.BatchingMaxPublishDelay) + batchingMaxPublishDelay, options.DisableBatching) p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int { return internalRouter(message.Key, metadata.NumPartitions()) } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index f10adcf..58f512f 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -64,8 +64,6 @@ type partitionProducer struct { partitionIdx int } -const defaultBatchingMaxPublishDelay = 10 * time.Millisecond - func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int) ( *partitionProducer, error) { var batchingMaxPublishDelay time.Duration diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 66fa8a0..50eea77 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -20,7 +20,7 @@ package pulsar import ( "context" "fmt" - "net/http" + "strconv" "sync" "testing" "time" @@ -351,11 +351,11 @@ func TestFlushInProducer(t *testing.T) { } func TestFlushInPartitionedProducer(t *testing.T) { - topicName := "persistent://public/default/partition-testFlushInPartitionedProducer" + topicName := "public/default/partition-testFlushInPartitionedProducer" // call admin api to make it partitioned - url := adminURL + "/" + "admin/v2/" + topicName + "/partitions" - makeHTTPCall(t, http.MethodPut, url, "5") + url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions" + makeHTTPCall(t, url, "5") numberOfPartitions := 5 numOfMessages := 10 @@ -427,6 +427,71 @@ func TestFlushInPartitionedProducer(t *testing.T) { assert.Equal(t, msgCount, numOfMessages/2) } +func TestRoundRobinRouterPartitionedProducer(t *testing.T) { + topicName := "public/default/partition-testRoundRobinRouterPartitionedProducer" + numberOfPartitions := 5 + + // call admin api to make it partitioned + url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions" + makeHTTPCall(t, url, strconv.Itoa(numberOfPartitions)) + + numOfMessages := 10 + ctx := context.Background() + + // creat client connection + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "my-sub", + Type: Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 5 messages + prefix := "msg-" + + for i := 0; i < numOfMessages; i++ { + messageContent := prefix + fmt.Sprintf("%d", i) + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(messageContent), + }) + assert.Nil(t, err) + } + + // Receive all messages + msgCount := 0 + msgPartitionMap := make(map[string]int) + for i := 0; i < numOfMessages; i++ { + msg, err := consumer.Receive(ctx) + fmt.Printf("Received message msgId: %#v topic: %s-- content: '%s'\n", + msg.ID(), msg.Topic(), string(msg.Payload())) + assert.Nil(t, err) + consumer.Ack(msg) + msgCount++ + msgPartitionMap[msg.Topic()]++ + } + assert.Equal(t, msgCount, numOfMessages) + assert.Equal(t, numberOfPartitions, len(msgPartitionMap)) + for _, count := range msgPartitionMap { + assert.Equal(t, count, numOfMessages/numberOfPartitions) + } +} + func TestMessageRouter(t *testing.T) { // Create topic with 5 partitions err := httpPut("admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5) diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go index 3471bca..9d54f20 100644 --- a/pulsar/test_helper.go +++ b/pulsar/test_helper.go @@ -118,10 +118,10 @@ func httpDo(method string, requestPath string, in interface{}, out interface{}) return nil } -func makeHTTPCall(t *testing.T, method string, url string, body string) { +func makeHTTPCall(t *testing.T, url string, body string) { client := http.Client{} - req, err := http.NewRequest(method, url, strings.NewReader(body)) + req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(body)) if err != nil { t.Fatal(err) }