tydhot commented on a change in pull request #300:
URL: https://github.com/apache/dubbo-go-pixiu/pull/300#discussion_r754068323



##########
File path: pkg/client/mq/mq.go
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+       "context"
+       "encoding/json"
+       "io/ioutil"
+       "strings"
+       "sync"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/client"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+import (
+       perrors "github.com/pkg/errors"
+)
+
+var (
+       mqClient          *Client
+       once              sync.Once
+       consumerFacadeMap sync.Map
+)
+
+func NewSingletonMQClient(config Config) *Client {
+       if mqClient == nil {
+               once.Do(func() {
+                       var err error
+                       mqClient, err = NewMQClient(config)
+                       if err != nil {
+                               logger.Errorf("create mq client failed, %s", 
err.Error())
+                       }
+               })
+       }
+       return mqClient
+}
+
+func NewMQClient(config Config) (*Client, error) {
+       var c *Client
+       ctx := context.Background()
+       switch config.MqType {
+       case constant.MQTypeKafka:
+               pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig)
+               if err != nil {
+                       return nil, err
+               }
+               c = &Client{
+                       ctx:                 ctx,
+                       producerFacade:      pf,
+                       kafkaConsumerConfig: config.KafkaConsumerConfig,
+               }
+       case constant.MQTypeRocketMQ:
+               return nil, perrors.New("rocketmq not support")
+       }
+
+       return c, nil
+}
+
+type Client struct {
+       ctx                 context.Context
+       producerFacade      ProducerFacade
+       kafkaConsumerConfig KafkaConsumerConfig
+}
+
+func (c Client) Apply() error {
+       panic("implement me")
+}
+
+func (c Client) Close() error {
+       return nil
+}
+
+func (c Client) Call(req *client.Request) (res interface{}, err error) {
+       body, err := ioutil.ReadAll(req.IngressRequest.Body)
+       if err != nil {
+               return nil, err
+       }
+
+       paths := strings.Split(req.API.Path, "/")
+       if len(paths) < 3 {
+               return nil, perrors.New("failed to send message, broker or 
Topic not found")
+       }
+
+       switch MQActionStrToInt[paths[0]] {
+       case MQActionPublish:
+               var pReq MQProduceRequest
+               err = json.Unmarshal(body, &pReq)
+               if err != nil {
+                       return nil, err
+               }
+               err = c.producerFacade.Send(pReq.Msg, WithTopic(pReq.Topic))
+               if err != nil {
+                       return nil, err
+               }
+       case MQActionSubscribe:
+               var cReq MQSubscribeRequest
+               err = json.Unmarshal(body, &cReq)
+               if err != nil {
+                       return nil, err
+               }
+               if _, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); !ok {

Review comment:
       这个场景我希望先简单化 订阅了之后解除订阅之前先不能改变订阅关系 后面在综合使用场景和各个mq之间的特性 再改变这里的逻辑 因为比如你先订阅A 
再订阅B 那么这里最后的关系是AB都订阅呢 还是覆盖掉之前的A呢?还有 这里的topic关系修改对于各个mq的client api的改动可能都不尽的相同 
如果把这些考虑上 整个业务逻辑的复杂度一下子就上升了 不如先从简




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to