jamesge commented on a change in pull request #972: Redis server protocol
URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r351584844
 
 

 ##########
 File path: src/brpc/policy/redis_protocol.cpp
 ##########
 @@ -52,62 +56,287 @@ struct InputResponse : public InputMessageBase {
     }
 };
 
-// "Message" = "Response" as we only implement the client for redis.
-ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
-                              bool /*read_eof*/, const void* /*arg*/) {
-    if (source->empty()) {
-        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+class RedisConnContext;
+class ConsumeTaskDone : public google::protobuf::Closure {
+public:
+    ConsumeTaskDone()
+        : _ready(false)
+        , output_message(&_arena) {}
+
+    void Run() override;
+    bool is_ready() { return _ready.load(butil::memory_order_acquire); }
+
+private:
+    butil::atomic<bool> _ready;
+    butil::Arena _arena;
+
+public:
+    RedisMessage output_message;
+    RedisConnContext* meta;
+    butil::IOBuf sendbuf;
+};
+
+
+class RedisConnContext : public brpc::SharedObject {
+public:
+    RedisConnContext() : handler_continue(NULL) {}
+    ~RedisConnContext() {
+        ClearQueue(dones);
+    }
+
+    void Push(ConsumeTaskDone* done) {
+        std::unique_lock<butil::Mutex> m(_mutex);
+        dones.push(done);
+    }
+    void Flush() {
+        std::queue<ConsumeTaskDone*> ready_to_write;
+        SocketUniquePtr s;
+        if (Socket::Address(socket_id, &s) != 0) {
+            LOG(WARNING) << "Fail to address redis socket";
+            return;
+        }
+        {
+            std::unique_lock<butil::Mutex> m(_mutex);
+            if (_writing) return;
+            _writing = true;
+        }
+        Socket::WriteOptions wopt;
+        wopt.ignore_eovercrowded = true;
+        std::queue<ConsumeTaskDone*> ready_to_delete;
+        while (true) {
+            std::unique_lock<butil::Mutex> m(_mutex);
+            while (!dones.empty() && dones.front()->is_ready()) {
+                ready_to_write.push(dones.front());
+                dones.pop();
+            }
+            if (ready_to_write.empty()) {
+                _writing = false;
+                break;
+            }
+            m.unlock();
+
+            while (!ready_to_write.empty()) {
+                ConsumeTaskDone* head = ready_to_write.front();
+                ready_to_write.pop();
+                LOG_IF(WARNING, s->Write(&head->sendbuf, &wopt) != 0)
+                    << "Fail to send redis reply";
+                ready_to_delete.push(head);
+            }
 
 Review comment:
   [important] 这里期望能在buffer层面合并,而不是写N次

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to