jamesge commented on a change in pull request #972: Redis server protocol
URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r351584650
 
 

 ##########
 File path: src/brpc/policy/redis_protocol.cpp
 ##########
 @@ -52,62 +56,287 @@ struct InputResponse : public InputMessageBase {
     }
 };
 
-// "Message" = "Response" as we only implement the client for redis.
-ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
-                              bool /*read_eof*/, const void* /*arg*/) {
-    if (source->empty()) {
-        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+class RedisConnContext;
+class ConsumeTaskDone : public google::protobuf::Closure {
+public:
+    ConsumeTaskDone()
+        : _ready(false)
+        , output_message(&_arena) {}
+
+    void Run() override;
+    bool is_ready() { return _ready.load(butil::memory_order_acquire); }
+
+private:
+    butil::atomic<bool> _ready;
+    butil::Arena _arena;
+
+public:
+    RedisMessage output_message;
+    RedisConnContext* meta;
+    butil::IOBuf sendbuf;
+};
+
+
+class RedisConnContext : public brpc::SharedObject {
+public:
+    RedisConnContext() : handler_continue(NULL) {}
+    ~RedisConnContext() {
+        ClearQueue(dones);
+    }
+
+    void Push(ConsumeTaskDone* done) {
+        std::unique_lock<butil::Mutex> m(_mutex);
+        dones.push(done);
+    }
+    void Flush() {
+        std::queue<ConsumeTaskDone*> ready_to_write;
+        SocketUniquePtr s;
+        if (Socket::Address(socket_id, &s) != 0) {
+            LOG(WARNING) << "Fail to address redis socket";
+            return;
+        }
+        {
+            std::unique_lock<butil::Mutex> m(_mutex);
+            if (_writing) return;
+            _writing = true;
+        }
+        Socket::WriteOptions wopt;
+        wopt.ignore_eovercrowded = true;
+        std::queue<ConsumeTaskDone*> ready_to_delete;
+        while (true) {
+            std::unique_lock<butil::Mutex> m(_mutex);
+            while (!dones.empty() && dones.front()->is_ready()) {
+                ready_to_write.push(dones.front());
+                dones.pop();
+            }
+            if (ready_to_write.empty()) {
+                _writing = false;
+                break;
+            }
+            m.unlock();
+
+            while (!ready_to_write.empty()) {
+                ConsumeTaskDone* head = ready_to_write.front();
+                ready_to_write.pop();
+                LOG_IF(WARNING, s->Write(&head->sendbuf, &wopt) != 0)
+                    << "Fail to send redis reply";
+                ready_to_delete.push(head);
+            }
+        }
+        ClearQueue(ready_to_delete);
     }
-    // NOTE(gejun): PopPipelinedInfo() is actually more contended than what
-    // I thought before. The Socket._pipeline_q is a SPSC queue pushed before
-    // sending and popped when response comes back, being protected by a
-    // mutex. Previously the mutex is shared with Socket._id_wait_list. When
-    // 200 bthreads access one redis-server, ~1.5s in total is spent on
-    // contention in 10-second duration. If the mutex is separated, the time
-    // drops to ~0.25s. I further replaced PeekPipelinedInfo() with
-    // GivebackPipelinedInfo() to lock only once(when receiving response)
-    // in most cases, and the time decreases to ~0.14s.
-    PipelinedInfo pi;
-    if (!socket->PopPipelinedInfo(&pi)) {
-        LOG(WARNING) << "No corresponding PipelinedInfo in socket";
-        return MakeParseError(PARSE_ERROR_TRY_OTHERS);
+
+    SocketId socket_id;
+    RedisService::CommandMap command_map;
+    RedisCommandHandler* handler_continue;
+    std::queue<ConsumeTaskDone*> dones;
+
+private:
+    void ClearQueue(std::queue<ConsumeTaskDone*>& queue) {
+        while (!queue.empty()) {
+            ConsumeTaskDone* head = queue.front();
+            queue.pop();
+            delete head;
+        }
     }
 
-    do {
-        InputResponse* msg = 
static_cast<InputResponse*>(socket->parsing_context());
-        if (msg == NULL) {
-            msg = new InputResponse;
-            socket->reset_parsing_context(msg);
+    bool _writing = false;
+    butil::Mutex _mutex;
+};
+
+struct TaskContext {
+    RedisMessage message;
+    butil::Arena arena;
+};
+
+const char** ParseArgs(const RedisMessage& message) {
+    const char** args = (const char**)
+        malloc(sizeof(const char*) * (message.size() + 1 /* NULL */));
+    for (size_t i = 0; i < message.size(); ++i) {
+        if (!message[i].is_string()) {
+            free(args);
+            return NULL;
         }
+        args[i] = message[i].c_str();
+    }
+    args[message.size()] = NULL;
+    return args;
+}
+
+void ConsumeTaskDone::Run() { 
+    butil::intrusive_ptr<RedisConnContext> delete_queue_meta(meta, false);
+    output_message.SerializeToIOBuf(&sendbuf);
+    _ready.store(true, butil::memory_order_release);
+    meta->Flush();
+}
 
-        const int consume_count = (pi.with_auth ? 1 : pi.count);
+int ConsumeTask(RedisConnContext* meta, const RedisMessage& m) {
+    ConsumeTaskDone* done = new ConsumeTaskDone;
+    ClosureGuard done_guard(done);
+    meta->Push(done);
+    meta->AddRefManually();
+    done->meta = meta;
+    RedisMessage& output = done->output_message;
 
-        ParseError err = msg->response.ConsumePartialIOBuf(*source, 
consume_count);
+    const char** args = ParseArgs(m);
+    if (!args) {
+        output.SetError("ERR command not string");
+        return -1;
+    }
+    if (meta->handler_continue) {
+        RedisCommandHandler::Result result = meta->handler_continue->Run(
+                args, &output, done_guard.release());
+        if (result == RedisCommandHandler::OK) {
+            meta->handler_continue = NULL;
+        }
+    } else {
+        std::string comm;
+        comm.reserve(8);
+        for (const char* c = m[0].c_str(); *c; ++c) {
+            comm.push_back(std::tolower(*c));
+        }
+        auto it = meta->command_map.find(comm);
+        if (it == meta->command_map.end()) {
+            char buf[64];
+            snprintf(buf, sizeof(buf), "ERR unknown command `%s`", 
comm.c_str());
+            output.SetError(buf);
+        } else {
+            RedisCommandHandler::Result result =
+                it->second->Run(args, &output, done_guard.release());
+            if (result == RedisCommandHandler::CONTINUE) {
+                meta->handler_continue = it->second.get();
+            }
+        }
+    }
+    free(args);
+    return 0;
+}
+
+int Consume(void* meta, bthread::TaskIterator<TaskContext*>& iter) {
+    RedisConnContext* qmeta = static_cast<RedisConnContext*>(meta);
+    if (iter.is_queue_stopped()) {
+        qmeta->RemoveRefManually();
+        return 0;
+    }
+    for (; iter; ++iter) {
+        std::unique_ptr<TaskContext> ctx(*iter);
+        ConsumeTask(qmeta, ctx->message);
+    }
+    return 0;
+}
+
+class ServerContext : public Destroyable {
 
 Review comment:
   和RedisConnContext分开的意义是什么?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to