MasterKenway commented on a change in pull request #286:
URL: https://github.com/apache/dubbo-go-pixiu/pull/286#discussion_r734943370



##########
File path: pkg/client/mq/mq.go
##########
@@ -0,0 +1,112 @@
+package mq
+
+import (
+       "context"
+       "encoding/json"
+       "github.com/Shopify/sarama"
+       "github.com/apache/dubbo-go-pixiu/pkg/client"
+       "github.com/apache/dubbo-go-pixiu/pkg/client/mq/impl"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+       "io/ioutil"
+       "strings"
+       "sync"
+
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/event"
+
+       perrors "github.com/pkg/errors"
+)
+
+var (
+       mqClient *Client
+       once     sync.Once
+)
+
+func NewSingletonMQClient(config event.Config) *Client {
+       if mqClient == nil {
+               once.Do(func() {
+                       mqClient, err := NewMQClient(config)
+                       if err != nil {
+                               logger.Errorf()
+                       }
+               })
+       }
+       return mqClient
+}
+
+func NewMQClient(config event.Config) (*Client, error) {
+       var c *Client
+       ctx, cancel := context.WithCancel(context.Background())
+
+       switch mqType {
+       case constant.MQTypeKafka:
+               c = &Client{
+                       ctx:            ctx,
+                       consumerFacade: impl.NewKafkaConsumerFacade(),
+                       producerFacade: nil,
+                       stop:           cancel,
+               }
+       case constant.MQTypeRocketMQ:
+       }
+       return c, nil
+}
+
+func EventConfigToSaramaConfig(config event.Config) (sarama.Config, error) {
+       c := sarama.NewConfig()
+       var err error
+       c.Version, err = sarama.ParseKafkaVersion(config.KafkaVersion)
+       if err != nil {
+               return c, err
+       }
+       return
+}
+
+type Client struct {
+       ctx            context.Context
+       consumerFacade ConsumerFacade
+       producerFacade ProducerFacade
+       stop           func()
+}
+
+func (c Client) Apply() error {
+       panic("implement me")
+}
+
+func (c Client) Close() error {
+       c.stop()
+       c.consumerFacade.Stop()
+       return nil
+}
+
+func (c Client) Call(req *client.Request) (res interface{}, err error) {
+       body, err := ioutil.ReadAll(req.IngressRequest.Body)
+       if err != nil {
+               return nil, err
+       }
+
+       var mqReq event.MQRequest
+       err = json.Unmarshal(body, &mqReq)
+       if err != nil {
+               return nil, err
+       }
+
+       paths := strings.Split(req.API.Path, "/")
+       if len(paths) < 3 {
+               return nil, perrors.New("failed to send message, broker or 
topic not found")
+       }
+
+       switch event.MQActionStrToInt[paths[0]] {
+       case event.MQActionPublish:
+       case event.MQActionSubscribe:
+       case event.MQActionUnSubscribe:
+       case event.MQActionConsumeAck:

Review comment:
       这是一个我初步的构思 目前的来看异步更适合在长链接的形式中实现 可以先删掉这块




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to