tydhot commented on a change in pull request #286:
URL: https://github.com/apache/dubbo-go-pixiu/pull/286#discussion_r733795016



##########
File path: pkg/filter/event/msg.go
##########
@@ -0,0 +1,38 @@
+package event
+
+// Msq Request Action Type Enum
+
+type MQType string
+
+type MQAction int
+
+const (
+       MQActionPublish = 1 + iota
+       MQActionSubscribe
+       MQActionUnSubscribe
+       MQActionConsumeAck
+)
+
+var MQActionIntToStr = map[MQAction]string{
+       MQActionPublish:     "publish",
+       MQActionSubscribe:   "subscribe",
+       MQActionUnSubscribe: "unsubscribe",
+       MQActionConsumeAck:  "consumeack",
+}
+
+var MQActionStrToInt = map[string]MQAction{
+       "publish":     MQActionPublish,
+       "subscribe":   MQActionSubscribe,
+       "unsubscribe": MQActionUnSubscribe,
+       "consumeack":  MQActionConsumeAck,
+}
+
+// MQRequest url format http://domain/publish/broker/topic
+type MQRequest struct {
+       ConsumerHook string `json:"consumer_hook"` // not empty when subscribe 
msg, eg: http://10.0.0.1:11451/consume

Review comment:
       感觉hook这个名字怪怪的 我觉得endpoint或者url之类的比较好吧

##########
File path: pkg/filter/event/msg.go
##########
@@ -0,0 +1,38 @@
+package event
+
+// Msq Request Action Type Enum
+
+type MQType string
+
+type MQAction int
+
+const (
+       MQActionPublish = 1 + iota
+       MQActionSubscribe
+       MQActionUnSubscribe
+       MQActionConsumeAck
+)
+
+var MQActionIntToStr = map[MQAction]string{
+       MQActionPublish:     "publish",
+       MQActionSubscribe:   "subscribe",
+       MQActionUnSubscribe: "unsubscribe",
+       MQActionConsumeAck:  "consumeack",
+}
+
+var MQActionStrToInt = map[string]MQAction{
+       "publish":     MQActionPublish,
+       "subscribe":   MQActionSubscribe,
+       "unsubscribe": MQActionUnSubscribe,
+       "consumeack":  MQActionConsumeAck,
+}
+
+// MQRequest url format http://domain/publish/broker/topic
+type MQRequest struct {

Review comment:
       MQConsumeRequest更直观点吧

##########
File path: pkg/client/mq/mq.go
##########
@@ -0,0 +1,112 @@
+package mq
+
+import (
+       "context"
+       "encoding/json"
+       "github.com/Shopify/sarama"
+       "github.com/apache/dubbo-go-pixiu/pkg/client"
+       "github.com/apache/dubbo-go-pixiu/pkg/client/mq/impl"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+       "io/ioutil"
+       "strings"
+       "sync"
+
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/event"
+
+       perrors "github.com/pkg/errors"
+)
+
+var (
+       mqClient *Client
+       once     sync.Once
+)
+
+func NewSingletonMQClient(config event.Config) *Client {
+       if mqClient == nil {
+               once.Do(func() {
+                       mqClient, err := NewMQClient(config)
+                       if err != nil {
+                               logger.Errorf()
+                       }
+               })
+       }
+       return mqClient
+}
+
+func NewMQClient(config event.Config) (*Client, error) {
+       var c *Client
+       ctx, cancel := context.WithCancel(context.Background())
+
+       switch mqType {
+       case constant.MQTypeKafka:
+               c = &Client{
+                       ctx:            ctx,
+                       consumerFacade: impl.NewKafkaConsumerFacade(),
+                       producerFacade: nil,
+                       stop:           cancel,
+               }
+       case constant.MQTypeRocketMQ:
+       }
+       return c, nil
+}
+
+func EventConfigToSaramaConfig(config event.Config) (sarama.Config, error) {
+       c := sarama.NewConfig()
+       var err error
+       c.Version, err = sarama.ParseKafkaVersion(config.KafkaVersion)
+       if err != nil {
+               return c, err
+       }
+       return
+}
+
+type Client struct {
+       ctx            context.Context
+       consumerFacade ConsumerFacade
+       producerFacade ProducerFacade
+       stop           func()
+}
+
+func (c Client) Apply() error {
+       panic("implement me")
+}
+
+func (c Client) Close() error {
+       c.stop()
+       c.consumerFacade.Stop()
+       return nil
+}
+
+func (c Client) Call(req *client.Request) (res interface{}, err error) {
+       body, err := ioutil.ReadAll(req.IngressRequest.Body)
+       if err != nil {
+               return nil, err
+       }
+
+       var mqReq event.MQRequest
+       err = json.Unmarshal(body, &mqReq)
+       if err != nil {
+               return nil, err
+       }
+
+       paths := strings.Split(req.API.Path, "/")
+       if len(paths) < 3 {
+               return nil, perrors.New("failed to send message, broker or 
topic not found")
+       }
+
+       switch event.MQActionStrToInt[paths[0]] {
+       case event.MQActionPublish:
+       case event.MQActionSubscribe:
+       case event.MQActionUnSubscribe:
+       case event.MQActionConsumeAck:

Review comment:
       我觉得ack环节在把消息通过http发送给消费者之后的检查response环节里 不是一个消费者发起的动作

##########
File path: pkg/filter/event/event.go
##########
@@ -0,0 +1,59 @@
+package event
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+       "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+)
+
+const (
+       // Kind is the kind of Fallback.
+       Kind = constant.HTTPCorsFilter
+)
+
+func init() {
+       filter.RegisterHttpFilter(&Plugin{})
+}
+
+type (
+       // Plugin is http filter plugin.
+       Plugin struct {
+       }
+
+       // Filter is http filter instance
+       Filter struct {
+               cfg *Config
+       }
+
+       Config struct {
+               mqType       MQType
+               KafkaVersion string

Review comment:
       这里的config mqType和kafkaVersion不应该放在一起的吧




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to