vongosling closed pull request #4: Send orderly
URL: https://github.com/apache/rocketmq-client-go/pull/4
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/api.go b/core/api.go
index 92619f3..0f4b6f9 100644
--- a/core/api.go
+++ b/core/api.go
@@ -29,7 +29,7 @@ func NewProduer(config *ProducerConfig) Producer {
 
 // ProducerConfig define a producer
 type ProducerConfig struct {
-       GroupID         string
+       GroupID     string
        NameServer  string
        Credentials *SessionCredentials
 }
@@ -44,6 +44,9 @@ type Producer interface {
        // SendMessageSync send a message with sync
        SendMessageSync(msg *Message) SendResult
 
+       // SendMessageOrderly send the message orderly
+       SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg 
interface{}, autoRetryTimes int) SendResult
+
        // SendMessageAsync send a message with async
        SendMessageAsync(msg *Message)
 }
diff --git a/core/producer.go b/core/producer.go
index 3d7e022..3cccf21 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -16,12 +16,24 @@
  */
 package rocketmq
 
-//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
-//#include "rocketmq/CMessage.h"
-//#include "rocketmq/CProducer.h"
-//#include "rocketmq/CSendResult.h"
+/*
+#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+
+#include <stdio.h>
+#include "rocketmq/CMessage.h"
+#include "rocketmq/CProducer.h"
+#include "rocketmq/CSendResult.h"
+
+int queueSelectorCallback_cgo(int size, CMessage *msg, void *selectorKey) {
+       int queueSelectorCallback(int, void*);
+       return queueSelectorCallback(size, selectorKey);
+}
+*/
 import "C"
-import "fmt"
+import (
+       "fmt"
+       "unsafe"
+)
 
 type SendStatus int
 
@@ -77,7 +89,7 @@ func (p *defaultProducer) String() string {
 func (p *defaultProducer) Start() error {
        err := int(C.StartProducer(p.cproduer))
        // TODO How to process err code.
-       fmt.Printf("result: %v \n", err)
+       fmt.Printf("producer start result: %v \n", err)
        return nil
 }
 
@@ -87,7 +99,7 @@ func (p *defaultProducer) Shutdown() error {
        err := C.ShutdownProducer(p.cproduer)
 
        // TODO How to process err code.
-       fmt.Printf("result: %v \n", err)
+       fmt.Printf("shutdown result: %v \n", err)
        return nil
 }
 
@@ -105,6 +117,28 @@ func (p *defaultProducer) SendMessageSync(msg *Message) 
SendResult {
        return result
 }
 
+func (p *defaultProducer) SendMessageOrderly(msg *Message, selector 
MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult {
+       cmsg := goMsgToC(msg)
+       key := selectors.put(&messageQueueSelectorWrapper{selector: selector, 
m: msg, arg: arg})
+
+       var sr C.struct__SendResult_
+       C.SendMessageOrderly(
+               p.cproduer,
+               cmsg,
+               
(C.QueueSelectorCallback)(unsafe.Pointer(C.queueSelectorCallback_cgo)),
+               unsafe.Pointer(&key),
+               C.int(autoRetryTimes),
+               &sr,
+       )
+       C.DestroyMessage(cmsg)
+
+       return SendResult{
+               Status: SendStatus(sr.sendStatus),
+               MsgId:  C.GoString(&sr.msgId[0]),
+               Offset: int64(sr.offset),
+       }
+}
+
 func (p *defaultProducer) SendMessageAsync(msg *Message) {
        // TODO
 }
diff --git a/core/queue_selector.go b/core/queue_selector.go
new file mode 100644
index 0000000..311c378
--- /dev/null
+++ b/core/queue_selector.go
@@ -0,0 +1,62 @@
+package rocketmq
+
+import "C"
+import (
+       "strconv"
+       "sync"
+       "unsafe"
+)
+
+var selectors = selectorHolder{selectors: 
map[int]*messageQueueSelectorWrapper{}}
+
+//export queueSelectorCallback
+func queueSelectorCallback(size int, selectorKey unsafe.Pointer) int {
+       s, ok := selectors.getAndDelete(*(*int)(selectorKey))
+       if !ok {
+               panic("BUG: not register the selector with key:" + 
strconv.Itoa(*(*int)(selectorKey)))
+       }
+       return s.Select(size)
+}
+
+type messageQueueSelectorWrapper struct {
+       selector MessageQueueSelector
+
+       m   *Message
+       arg interface{}
+}
+
+func (w *messageQueueSelectorWrapper) Select(size int) int {
+       return w.selector.Select(size, w.m, w.arg)
+}
+
+// MessageQueueSelector select one message queue
+type MessageQueueSelector interface {
+       Select(size int, m *Message, arg interface{}) int
+}
+
+type selectorHolder struct {
+       sync.Mutex
+
+       selectors map[int]*messageQueueSelectorWrapper
+       key       int
+}
+
+func (s *selectorHolder) put(selector *messageQueueSelectorWrapper) (key int) {
+       s.Lock()
+       key = s.key
+       s.selectors[key] = selector
+       s.key++
+       s.Unlock()
+       return
+}
+
+func (s *selectorHolder) getAndDelete(key int) (*messageQueueSelectorWrapper, 
bool) {
+       s.Lock()
+       selector, ok := s.selectors[key]
+       if ok {
+               delete(s.selectors, key)
+       }
+       s.Unlock()
+
+       return selector, ok
+}
diff --git a/core/queue_selector_test.go b/core/queue_selector_test.go
new file mode 100644
index 0000000..06743b4
--- /dev/null
+++ b/core/queue_selector_test.go
@@ -0,0 +1,53 @@
+package rocketmq
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+type mockMessageQueueSelector struct {
+       arg  interface{}
+       m    *Message
+       size int
+
+       selectRet int
+}
+
+func (m *mockMessageQueueSelector) Select(size int, msg *Message, arg 
interface{}) int {
+       m.arg, m.m, m.size = arg, msg, size
+       return m.selectRet
+}
+
+func TestWraper(t *testing.T) {
+       s := &mockMessageQueueSelector{selectRet: 2}
+       w := &messageQueueSelectorWrapper{selector: s, m: &Message{}, arg: 3}
+
+       assert.Equal(t, 2, w.Select(4))
+       assert.Equal(t, w.m, s.m)
+       v, ok := s.arg.(int)
+       assert.True(t, ok)
+       assert.Equal(t, 3, v)
+}
+
+func TestSelectorHolder(t *testing.T) {
+       s := &messageQueueSelectorWrapper{}
+
+       key := selectors.put(s)
+       assert.Equal(t, 0, key)
+
+       key = selectors.put(s)
+       assert.Equal(t, 1, key)
+
+       assert.Equal(t, 2, len(selectors.selectors))
+
+       ss, ok := selectors.getAndDelete(0)
+       assert.Equal(t, s, ss)
+       assert.True(t, ok)
+
+       ss, ok = selectors.getAndDelete(1)
+       assert.Equal(t, s, ss)
+       assert.True(t, ok)
+
+       assert.Equal(t, 0, len(selectors.selectors))
+}
diff --git a/examples/orderproducer/producer.go 
b/examples/orderproducer/producer.go
new file mode 100644
index 0000000..8e78fb9
--- /dev/null
+++ b/examples/orderproducer/producer.go
@@ -0,0 +1,105 @@
+package main
+
+import (
+       "flag"
+       "fmt"
+       "sync"
+       "sync/atomic"
+
+       rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type queueSelectorByOrderID struct{}
+
+func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg 
interface{}) int {
+       return arg.(int) % size
+}
+
+var (
+       namesrvAddrs string
+       topic        string
+       body         string
+       groupID      string
+       msgCount     int64
+       workerCount  int
+)
+
+func init() {
+       flag.StringVar(&namesrvAddrs, "n", "", "name server address")
+       flag.StringVar(&topic, "t", "", "topic")
+       flag.StringVar(&groupID, "g", "", "group")
+       flag.StringVar(&body, "d", "", "body")
+       flag.Int64Var(&msgCount, "m", 0, "message count")
+       flag.IntVar(&workerCount, "w", 0, "worker count")
+}
+
+type worker struct {
+       p            rocketmq.Producer
+       leftMsgCount *int64
+}
+
+func (w *worker) run() {
+       selector := queueSelectorByOrderID{}
+       for atomic.AddInt64(w.leftMsgCount, -1) >= 0 {
+               r := w.p.SendMessageOrderly(
+                       &rocketmq.Message{Topic: topic, Body: body}, selector, 
7 /*orderID*/, 3,
+               )
+               fmt.Printf("send result:%+v\n", r)
+       }
+}
+
+func main() {
+       flag.Parse()
+
+       if namesrvAddrs == "" {
+               println("empty namesrv address")
+               return
+       }
+
+       if topic == "" {
+               println("empty topic")
+               return
+       }
+
+       if body == "" {
+               println("empty body")
+               return
+       }
+
+       if groupID == "" {
+               println("empty groupID")
+               return
+       }
+
+       if msgCount == 0 {
+               println("zero message count")
+               return
+       }
+
+       if workerCount == 0 {
+               println("zero worker count")
+               return
+       }
+
+       producer := rocketmq.NewProduer(&rocketmq.ProducerConfig{
+               GroupID:    "testGroup",
+               NameServer: "10.200.20.25:9988",
+       })
+       producer.Start()
+       defer producer.Shutdown()
+
+       wg := sync.WaitGroup{}
+       wg.Add(workerCount)
+
+       workers := make([]worker, workerCount)
+       for i := range workers {
+               workers[i].p = producer
+               workers[i].leftMsgCount = &msgCount
+       }
+
+       for i := range workers {
+               go func(w *worker) { w.run(); wg.Done() }(&workers[i])
+       }
+
+       wg.Wait()
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to