wwbmmm commented on code in PR #2093:
URL: https://github.com/apache/brpc/pull/2093#discussion_r1073125489


##########
docs/cn/mysql_client.md:
##########
@@ -0,0 +1,562 @@
+[MySQL](https://www.mysql.com/)是著名的开源的关系型数据库,为了使用户更快捷地访问mysql并充分利用bthread的并发能力,brpc直接支持mysql协议。示例程序:[example/mysql_c++](https://github.com/brpc/brpc/tree/master/example/mysql_c++/)
+
+**注意**:只支持MySQL 4.1 及之后的版本的文本协议,支持事务,不支持Prepared 
statement。目前支持的鉴权方式为mysql_native_password,使用事务的时候不支持single模式。

Review Comment:
   现在是不是支持Prepared statement了



##########
docs/cn/mysql_client.md:
##########
@@ -0,0 +1,562 @@
+[MySQL](https://www.mysql.com/)是著名的开源的关系型数据库,为了使用户更快捷地访问mysql并充分利用bthread的并发能力,brpc直接支持mysql协议。示例程序:[example/mysql_c++](https://github.com/brpc/brpc/tree/master/example/mysql_c++/)
+
+**注意**:只支持MySQL 4.1 及之后的版本的文本协议,支持事务,不支持Prepared 
statement。目前支持的鉴权方式为mysql_native_password,使用事务的时候不支持single模式。
+
+相比使用[libmysqlclient](https://dev.mysql.com/downloads/connector/c/)(官方client)的优势有:
+
+- 线程安全。用户不需要为每个线程建立独立的client。
+- 支持同步、异步、半同步等访问方式,能使用[ParallelChannel等](combo_channel.md)组合访问方式。
+- 支持多种[连接方式](client.md#连接方式)。支持超时、backup request、取消、tracing、内置服务等一系列brpc提供的福利。
+- 明确的返回类型校验,如果使用了不正确的变量接受mysql的数据类型,将抛出异常。
+- 调用mysql标准库会阻塞框架的并发能力,使用本实现将能充分利用brpc框架的并发能力。
+- 使用brpc实现的mysql不会造成pthread的阻塞,使用libmysqlclient会阻塞pthread 
[线程相关](bthread.md),使用mysql的异步api会使编程变得很复杂。
+# 访问mysql
+
+创建一个访问mysql的Channel:
+
+```c++
+# include <brpc/mysql.h>
+# include <brpc/policy/mysql_authenticator.h>
+# include <brpc/channel.h>
+
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_MYSQL;
+options.connection_type = FLAGS_connection_type;
+options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+options.max_retry = FLAGS_max_retry;
+options.auth = new brpc::policy::MysqlAuthenticator("yangliming01", "123456", 
"test", 
+    "charset=utf8&collation_connection=utf8_unicode_ci");
+if (channel.Init("127.0.0.1", 3306, &options) != 0) {
+    LOG(ERROR) << "Fail to initialize channel";
+    return -1;
+}
+```
+
+向mysql发起命令。
+
+```c++
+// 执行各种mysql命令,可以批量执行命令如:"select * from tab1;select * from tab2"
+std::string command = "show databases"; // 
select,delete,update,insert,create,drop ...
+brpc::MysqlRequest request;
+if (!request.Query(command)) {
+    LOG(ERROR) << "Fail to add command";
+    return false;
+}
+brpc::MysqlResponse response;
+brpc::Controller cntl;
+channel.CallMethod(NULL, &cntl, &request, &response, NULL);
+if (!cntl.Failed()) {
+    std::cout << response << std::endl;
+} else {
+    LOG(ERROR) << "Fail to access mysql, " << cntl.ErrorText();
+    return false;
+}
+return true;
+```
+
+上述代码的说明:
+
+- 
请求类型必须为MysqlRequest,回复类型必须为MysqlResponse,否则CallMethod会失败。不需要stub,直接调用channel.CallMethod,method填NULL。
+- 调用request.Query()传入要执行的命令,可以批量执行命令,多个命令用分号隔开。
+- 
依次调用response.reply(X)弹出操作结果,根据返回类型的不同,选择不同的类型接收,如:MysqlReply::Ok,MysqlReply::Error,const
 MysqlReply::Columnconst MysqlReply::Row等。
+- 如果只有一条命令则reply为1个,如果为批量操作返回的reply为多个。
+
+目前支持的请求操作有:
+
+```c++
+bool Query(const butil::StringPiece& command);
+```
+
+对应的回复操作:
+
+```c++
+// 返回不同类型的结果
+const MysqlReply::Auth& auth() const;
+const MysqlReply::Ok& ok() const;
+const MysqlReply::Error& error() const;
+const MysqlReply::Eof& eof() const;
+// 对result set结果集的操作
+// get column number
+uint64_t MysqlReply::column_number() const;
+// get one column
+const MysqlReply::Column& MysqlReply::column(const uint64_t index) const;
+// get row number
+uint64_t MysqlReply::row_number() const;
+// get one row
+const MysqlReply::Row& MysqlReply::next() const;
+// 结果集中每个字段的操作
+const MysqlReply::Field& MysqlReply::Row::field(const uint64_t index) const;
+```
+
+# 事务操作
+
+事务可以保证在一个事务中的多个RPC请求最终要么都成功,要么都失败。
+
+```c++
+rpc::Channel channel;

Review Comment:
   这一段初始化Channel的代码可以略去



##########
src/brpc/controller.h:
##########
@@ -815,6 +824,13 @@ friend void 
policy::ProcessThriftRequest(InputMessageBase*);
     // Defined at both sides
     StreamSettings *_remote_stream_settings;
 
+    // controller bind socket action
+    BindSockAction _bind_sock_action;
+    // controller bind sock
+    SocketUniquePtr _bind_sock;
+    // sql prepare statement
+    MysqlStatementStub *_stmt;

Review Comment:
   建议叫_mysql_stmt



##########
docs/cn/mysql_client.md:
##########
@@ -0,0 +1,562 @@
+[MySQL](https://www.mysql.com/)是著名的开源的关系型数据库,为了使用户更快捷地访问mysql并充分利用bthread的并发能力,brpc直接支持mysql协议。示例程序:[example/mysql_c++](https://github.com/brpc/brpc/tree/master/example/mysql_c++/)
+
+**注意**:只支持MySQL 4.1 及之后的版本的文本协议,支持事务,不支持Prepared 
statement。目前支持的鉴权方式为mysql_native_password,使用事务的时候不支持single模式。
+
+相比使用[libmysqlclient](https://dev.mysql.com/downloads/connector/c/)(官方client)的优势有:
+
+- 线程安全。用户不需要为每个线程建立独立的client。
+- 支持同步、异步、半同步等访问方式,能使用[ParallelChannel等](combo_channel.md)组合访问方式。
+- 支持多种[连接方式](client.md#连接方式)。支持超时、backup request、取消、tracing、内置服务等一系列brpc提供的福利。
+- 明确的返回类型校验,如果使用了不正确的变量接受mysql的数据类型,将抛出异常。
+- 调用mysql标准库会阻塞框架的并发能力,使用本实现将能充分利用brpc框架的并发能力。
+- 使用brpc实现的mysql不会造成pthread的阻塞,使用libmysqlclient会阻塞pthread 
[线程相关](bthread.md),使用mysql的异步api会使编程变得很复杂。
+# 访问mysql
+
+创建一个访问mysql的Channel:
+
+```c++
+# include <brpc/mysql.h>
+# include <brpc/policy/mysql_authenticator.h>
+# include <brpc/channel.h>
+
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_MYSQL;
+options.connection_type = FLAGS_connection_type;
+options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+options.max_retry = FLAGS_max_retry;
+options.auth = new brpc::policy::MysqlAuthenticator("yangliming01", "123456", 
"test", 
+    "charset=utf8&collation_connection=utf8_unicode_ci");
+if (channel.Init("127.0.0.1", 3306, &options) != 0) {
+    LOG(ERROR) << "Fail to initialize channel";
+    return -1;
+}
+```
+
+向mysql发起命令。
+
+```c++
+// 执行各种mysql命令,可以批量执行命令如:"select * from tab1;select * from tab2"
+std::string command = "show databases"; // 
select,delete,update,insert,create,drop ...
+brpc::MysqlRequest request;
+if (!request.Query(command)) {
+    LOG(ERROR) << "Fail to add command";
+    return false;
+}
+brpc::MysqlResponse response;
+brpc::Controller cntl;
+channel.CallMethod(NULL, &cntl, &request, &response, NULL);
+if (!cntl.Failed()) {
+    std::cout << response << std::endl;
+} else {
+    LOG(ERROR) << "Fail to access mysql, " << cntl.ErrorText();
+    return false;
+}
+return true;
+```
+
+上述代码的说明:
+
+- 
请求类型必须为MysqlRequest,回复类型必须为MysqlResponse,否则CallMethod会失败。不需要stub,直接调用channel.CallMethod,method填NULL。
+- 调用request.Query()传入要执行的命令,可以批量执行命令,多个命令用分号隔开。
+- 
依次调用response.reply(X)弹出操作结果,根据返回类型的不同,选择不同的类型接收,如:MysqlReply::Ok,MysqlReply::Error,const
 MysqlReply::Columnconst MysqlReply::Row等。
+- 如果只有一条命令则reply为1个,如果为批量操作返回的reply为多个。
+
+目前支持的请求操作有:
+
+```c++
+bool Query(const butil::StringPiece& command);
+```
+
+对应的回复操作:
+
+```c++
+// 返回不同类型的结果
+const MysqlReply::Auth& auth() const;
+const MysqlReply::Ok& ok() const;
+const MysqlReply::Error& error() const;
+const MysqlReply::Eof& eof() const;
+// 对result set结果集的操作
+// get column number
+uint64_t MysqlReply::column_number() const;
+// get one column
+const MysqlReply::Column& MysqlReply::column(const uint64_t index) const;
+// get row number
+uint64_t MysqlReply::row_number() const;
+// get one row
+const MysqlReply::Row& MysqlReply::next() const;
+// 结果集中每个字段的操作
+const MysqlReply::Field& MysqlReply::Row::field(const uint64_t index) const;
+```
+
+# 事务操作
+
+事务可以保证在一个事务中的多个RPC请求最终要么都成功,要么都失败。
+
+```c++
+rpc::Channel channel;
+// Initialize the channel, NULL means using default options.
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_MYSQL;
+options.connection_type = FLAGS_connection_type;
+options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+options.connect_timeout_ms = FLAGS_connect_timeout_ms;
+options.max_retry = FLAGS_max_retry;
+options.auth = new brpc::policy::MysqlAuthenticator(
+    FLAGS_user, FLAGS_password, FLAGS_schema, FLAGS_params);
+if (channel.Init(FLAGS_server.c_str(), FLAGS_port, &options) != 0) {
+    LOG(ERROR) << "Fail to initialize channel";
+    return -1;
+}
+
+// create transaction
+brpc::MysqlTransactionOptions options;
+options.readonly = FLAGS_readonly;
+options.isolation_level = brpc::MysqlIsolationLevel(FLAGS_isolation_level);
+auto tx(brpc::NewMysqlTransaction(channel, options));
+if (tx == NULL) {
+    LOG(ERROR) << "Fail to create transaction";
+    return false;
+}
+
+brpc::MysqlRequest request(tx.get());
+if (!request.Query(*it)) {
+    LOG(ERROR) << "Fail to add command";
+    tx->rollback();
+    return false;
+}
+brpc::MysqlResponse response;
+brpc::Controller cntl;
+channel.CallMethod(NULL, &cntl, &request, &response, NULL);
+if (cntl.Failed()) {
+    LOG(ERROR) << "Fail to access mysql, " << cntl.ErrorText();
+    tx->rollback();
+    return false;
+}
+// handle response
+std::cout << response << std::endl;
+bool rc = tx->commit();
+```
+
+# Prepared Statement
+
+Prepared 
statement对于一个需要执行很多次的SQL语句,它把这个SQL语句注册到mysql-server,避免了每次请求在mysql-server端都去解析这个SQL语句,能得到性能上的提升。
+
+```c++
+rpc::Channel channel;
+// Initialize the channel, NULL means using default options.
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_MYSQL;
+options.connection_type = FLAGS_connection_type;
+options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+options.connect_timeout_ms = FLAGS_connect_timeout_ms;
+options.max_retry = FLAGS_max_retry;
+options.auth = new brpc::policy::MysqlAuthenticator(
+    FLAGS_user, FLAGS_password, FLAGS_schema, FLAGS_params);
+if (channel.Init(FLAGS_server.c_str(), FLAGS_port, &options) != 0) {
+    LOG(ERROR) << "Fail to initialize channel";
+    return -1;
+}
+
+auto stmt(brpc::NewMysqlStatement(channel, "select * from tb where name=?"));
+if (stmt == NULL) {
+    LOG(ERROR) << "Fail to create mysql statement";
+    return -1;
+}
+
+brpc::MysqlRequest request(stmt.get());
+if (!request.AddParam("lilei")) {
+    LOG(ERROR) << "Fail to add name param";
+    return NULL;
+}
+
+brpc::MysqlResponse response;
+brpc::Controller cntl;
+channel->CallMethod(NULL, &cntl, &request, &response, NULL);
+if (cntl.Failed()) {
+    LOG(ERROR) << "Fail to access mysql, " << cntl.ErrorText();
+    return NULL;
+}
+
+std::cout << response << std::endl;
+```
+
+
+
+# 性能测试
+
+我在example/mysql_c++目录下面写了两个测试程序,mysql_press.cpp 
mysqlclient_press.cpp,mysql_go_press.go 
一个是使用了brpc框架,一个是使用了的libmysqlclient访问mysql,一个是使用[go-sql-driver](https://github.com/go-sql-driver)/**go-mysql**访问mysql
+
+启动单线程测试
+
+##### brpc框架访问mysql(单线程)
+
+./mysql_press -thread_num=1 -op_type=0 // insert
+
+```
+qps=3071 latency=320
+qps=3156 latency=311
+qps=3166 latency=310
+qps=3151 latency=312
+qps=3093 latency=317
+qps=3146 latency=312
+qps=3139 latency=313
+qps=3114 latency=315
+qps=3055 latency=321
+qps=3135 latency=313
+qps=2611 latency=376
+qps=3072 latency=320
+qps=3026 latency=324
+qps=2792 latency=352
+qps=3181 latency=309
+qps=3181 latency=309
+qps=3197 latency=307
+qps=3024 latency=325
+```
+
+./mysql_press -thread_num=1 -op_type=1
+
+```
+qps=6414 latency=151
+qps=5292 latency=182
+qps=6700 latency=144
+qps=6858 latency=141
+qps=6915 latency=140
+qps=6822 latency=142
+qps=6722 latency=144
+qps=6852 latency=141
+qps=6713 latency=144
+qps=6741 latency=144
+qps=6734 latency=144
+qps=6611 latency=146
+qps=6554 latency=148
+qps=6810 latency=142
+qps=6787 latency=143
+qps=6737 latency=144
+qps=6579 latency=147
+qps=6634 latency=146
+qps=6716 latency=144
+qps=6711 latency=144
+```
+
+./mysql_press -thread_num=1 -op_type=2 // update
+
+```
+qps=3090 latency=318
+qps=3452 latency=284
+qps=3239 latency=303
+qps=3328 latency=295
+qps=3218 latency=305
+qps=3251 latency=302
+qps=2516 latency=391
+qps=2874 latency=342
+qps=3366 latency=292
+qps=3249 latency=302
+qps=3346 latency=294
+qps=3486 latency=282
+qps=3457 latency=284
+qps=3439 latency=286
+qps=3386 latency=290
+qps=3352 latency=293
+qps=3253 latency=302
+qps=3341 latency=294
+```
+
+##### libmysqlclient实现(单线程)
+
+./mysqlclient_press -thread_num=1 -op_type=0 // insert
+
+```
+qps=3166 latency=313
+qps=3157 latency=314
+qps=2941 latency=337
+qps=3270 latency=303
+qps=3305 latency=300
+qps=3445 latency=287
+qps=3455 latency=287
+qps=3449 latency=287
+qps=3486 latency=284
+qps=3551 latency=279
+qps=3517 latency=281
+qps=3283 latency=302
+qps=3353 latency=295
+qps=2564 latency=386
+qps=3243 latency=305
+qps=3333 latency=297
+qps=3598 latency=275
+qps=3714 latency=267
+```
+
+./mysqlclient_press -thread_num=1 -op_type=1
+
+```
+qps=8209 latency=120
+qps=8022 latency=123
+qps=7879 latency=125
+qps=8083 latency=122
+qps=8504 latency=116
+qps=8112 latency=121
+qps=8278 latency=119
+qps=8698 latency=113
+qps=8817 latency=112
+qps=8755 latency=112
+qps=8734 latency=113
+qps=8390 latency=117
+qps=8230 latency=120
+qps=8486 latency=116
+qps=8038 latency=122
+qps=8640 latency=114
+```
+
+./mysqlclient_press -thread_num=1 -op_type=2 // update
+
+```
+qps=3583 latency=276
+qps=3530 latency=280
+qps=3610 latency=274
+qps=3492 latency=283
+qps=3508 latency=282
+qps=3465 latency=286
+qps=3543 latency=279
+qps=3610 latency=274
+qps=3567 latency=278
+qps=3381 latency=293
+qps=3514 latency=282
+qps=3461 latency=286
+qps=3456 latency=286
+qps=3517 latency=281
+qps=3492 latency=284
+```
+
+##### golang访问mysql(单线程)
+
+go run test.go -thread_num=1
+
+```
+qps = 6905 latency = 144
+qps = 6922 latency = 143
+qps = 6931 latency = 143
+qps = 6998 latency = 142
+qps = 6780 latency = 146
+qps = 6980 latency = 142
+qps = 6901 latency = 144
+qps = 6887 latency = 144
+qps = 6943 latency = 143
+qps = 6880 latency = 144
+qps = 6815 latency = 146
+qps = 6089 latency = 163
+qps = 6626 latency = 150
+qps = 6361 latency = 156
+qps = 6783 latency = 146
+qps = 6789 latency = 146
+qps = 6883 latency = 144
+qps = 6795 latency = 146
+qps = 6724 latency = 148
+qps = 6861 latency = 145
+qps = 6878 latency = 144
+qps = 6842 latency = 146
+```
+
+从以上测试结果看来,使用brpc实现的mysql协议和使用libmysqlclient在插入、修改、删除操作上性能是类似的,但是在查询操作看会逊色于libmysqlclient,查询的性能和golang实现的mysql类似。
+
+##### brpc框架访问mysql(50线程)
+
+./mysql_press -thread_num=50 -op_type=1 -use_bthread=true
+
+```
+qps=18843 latency=2656
+qps=22426 latency=2226
+qps=22536 latency=2203
+qps=22560 latency=2193
+qps=22270 latency=2226
+qps=22302 latency=2247
+qps=22147 latency=2225
+qps=22517 latency=2228
+qps=22762 latency=2176
+qps=23061 latency=2162
+qps=23819 latency=2070
+qps=23852 latency=2077
+qps=22682 latency=2214
+qps=22381 latency=2213
+qps=24041 latency=2069
+qps=24562 latency=2022
+qps=24874 latency=2004
+qps=24821 latency=1988
+qps=24209 latency=2073
+qps=21706 latency=2281
+```
+
+##### libmysqlclient实现(50线程)
+
+./mysql_press -thread_num=50 -op_type=1 -use_bthread=true
+
+```
+qps=23656 latency=378
+qps=16190 latency=555
+qps=20136 latency=445
+qps=22238 latency=401
+qps=22229 latency=403
+qps=19109 latency=470
+qps=22569 latency=394
+qps=26250 latency=343
+qps=28208 latency=318
+qps=29649 latency=301
+qps=29874 latency=301
+qps=30033 latency=301
+qps=25911 latency=345
+qps=28048 latency=317
+qps=27398 latency=329
+```
+
+##### golang访问mysql(50协程)
+
+go run ../mysql_go_press.go -thread_num=50
+
+```
+qps = 23660 latency = 2049
+qps = 23198 latency = 2160
+qps = 23765 latency = 2181
+qps = 23323 latency = 2149
+qps = 14833 latency = 2136
+qps = 23822 latency = 2853
+qps = 20389 latency = 2474
+qps = 23290 latency = 2151
+qps = 23526 latency = 2153
+qps = 21426 latency = 2613
+qps = 23339 latency = 2155
+qps = 25623 latency = 2084
+qps = 23048 latency = 2210
+qps = 20694 latency = 2423
+qps = 23705 latency = 2122
+qps = 23445 latency = 2125
+qps = 24368 latency = 2054
+qps = 23027 latency = 2175
+qps = 24307 latency = 2063
+qps = 23227 latency = 2096
+qps = 23646 latency = 2173
+```
+
+以上是启动50并发的查询请求,看上去qps都比较相似,但是libmysqlclient延时明显低。
+
+##### brpc框架访问mysql(100线程)
+
+./mysql_press -thread_num=100 -op_type=1 -use_bthread=true
+
+```
+qps=26428 latency=3764
+qps=26305 latency=3780
+qps=26390 latency=3779
+qps=26278 latency=3787
+qps=26326 latency=3787
+qps=26266 latency=3792
+qps=26394 latency=3773
+qps=26263 latency=3797
+qps=26250 latency=3783
+qps=26362 latency=3782
+qps=26212 latency=3796
+qps=26260 latency=3800
+qps=24666 latency=4035
+qps=25569 latency=3896
+qps=26223 latency=3794
+qps=25538 latency=3890
+qps=20065 latency=4958
+qps=23023 latency=4331
+qps=25808 latency=3875
+```
+
+##### libmysqlclient实现(100线程)
+
+./mysql_press -thread_num=50 -op_type=1 -use_bthread=true

Review Comment:
   50应改成100



##########
src/brpc/mysql.cpp:
##########
@@ -0,0 +1,695 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yanglimin...@baidu.com)
+
+#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION
+#include <algorithm>
+#include <gflags/gflags.h>
+#include <google/protobuf/stubs/once.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/wire_format_lite_inl.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/reflection_ops.h>
+#include <google/protobuf/wire_format.h>
+#include "butil/string_printf.h"
+#include "butil/macros.h"
+#include "brpc/controller.h"
+#include "brpc/mysql.h"
+#include "brpc/mysql_common.h"
+
+namespace brpc {
+
+DEFINE_int32(mysql_multi_replies_size, 10, "multi replies size in one 
MysqlResponse");
+
+// Internal implementation detail -- do not call these.
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl();
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+void protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+void protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto();
+
+namespace {
+
+const ::google::protobuf::Descriptor* MysqlRequest_descriptor_ = NULL;
+const ::google::protobuf::Descriptor* MysqlResponse_descriptor_ = NULL;
+
+}  // namespace
+
+void protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    const ::google::protobuf::FileDescriptor* file =
+        ::google::protobuf::DescriptorPool::generated_pool()->FindFileByName(
+            "baidu/rpc/mysql_base.proto");
+    GOOGLE_CHECK(file != NULL);
+    MysqlRequest_descriptor_ = file->message_type(0);
+    MysqlResponse_descriptor_ = file->message_type(1);
+}
+
+namespace {
+
+GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AssignDescriptors_once_);
+inline void protobuf_AssignDescriptorsOnce() {
+    ::google::protobuf::GoogleOnceInit(&protobuf_AssignDescriptors_once_,
+                                       
&protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto);
+}
+
+void protobuf_RegisterTypes(const ::std::string&) {
+    protobuf_AssignDescriptorsOnce();
+    ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
+        MysqlRequest_descriptor_, &MysqlRequest::default_instance());
+    ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
+        MysqlResponse_descriptor_, &MysqlResponse::default_instance());
+}
+
+}  // namespace
+
+void protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    delete MysqlRequest::default_instance_;
+    delete MysqlResponse::default_instance_;
+}
+
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl() {
+    GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+#if GOOGLE_PROTOBUF_VERSION >= 3002000
+    ::google::protobuf::internal::InitProtobufDefaults();
+#else
+    
::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
+#endif
+    ::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
+        "\n\032baidu/rpc/mysql_base.proto\022\tbaidu.rpc\032"
+        " google/protobuf/descriptor.proto\"\016\n\014Mys"
+        "qlRequest\"\017\n\rMysqlResponseB\003\200\001\001",
+        111);
+    
::google::protobuf::MessageFactory::InternalRegisterGeneratedFile("baidu/rpc/mysql_base.proto",
+                                                                      
&protobuf_RegisterTypes);
+    MysqlRequest::default_instance_ = new MysqlRequest();
+    MysqlResponse::default_instance_ = new MysqlResponse();
+    MysqlRequest::default_instance_->InitAsDefaultInstance();
+    MysqlResponse::default_instance_->InitAsDefaultInstance();
+    ::google::protobuf::internal::OnShutdown(
+        &protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto);
+}
+
+GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_once);
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    
::google::protobuf::GoogleOnceInit(&protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_once,
+                                       
&protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl);
+}
+
+// Force AddDescriptors() to be called at static initialization time.
+struct StaticDescriptorInitializer_baidu_2frpc_2fmysql_5fbase_2eproto {
+    StaticDescriptorInitializer_baidu_2frpc_2fmysql_5fbase_2eproto() {
+        protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    }
+} static_descriptor_initializer_baidu_2frpc_2fmysql_5fbase_2eproto_;
+
+
+// ===================================================================
+
+#ifndef _MSC_VER
+#endif  // !_MSC_VER
+
+butil::Status MysqlStatementStub::PackExecuteCommand(butil::IOBuf* outbuf, 
uint32_t stmt_id) {
+    butil::Status st;
+    // long data
+    for (const auto& i : _long_data) {
+        st = MysqlMakeLongDataPacket(outbuf, stmt_id, i.param_id, i.long_data);
+        if (!st.ok()) {
+            LOG(ERROR) << "make long data header error " << st;
+            return st;
+        }
+    }
+    _long_data.clear();
+    // execute data
+    st = MysqlMakeExecutePacket(outbuf, stmt_id, _execute_data);
+    if (!st.ok()) {
+        LOG(ERROR) << "make execute header error " << st;
+        return st;
+    }
+    _execute_data.clear();
+    _null_mask.mask.clear();
+    _null_mask.area = butil::IOBuf::INVALID_AREA;
+    _param_types.types.clear();
+    _param_types.area = butil::IOBuf::INVALID_AREA;
+
+    return st;
+}
+
+MysqlRequest::MysqlRequest() : ::google::protobuf::Message() {
+    SharedCtor();
+}
+
+MysqlRequest::MysqlRequest(const MysqlTransaction* tx) : 
::google::protobuf::Message() {
+    SharedCtor();
+    _tx = tx;
+}
+
+MysqlRequest::MysqlRequest(MysqlStatement* stmt) : 
::google::protobuf::Message() {
+    SharedCtor();
+    _stmt = new MysqlStatementStub(stmt);
+}
+
+MysqlRequest::MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt)
+    : ::google::protobuf::Message() {
+    SharedCtor();
+    _tx = tx;
+    _stmt = new MysqlStatementStub(stmt);
+}
+
+void MysqlRequest::InitAsDefaultInstance() {}
+
+MysqlRequest::MysqlRequest(const MysqlRequest& from) : 
::google::protobuf::Message() {
+    SharedCtor();
+    MergeFrom(from);
+}
+
+void MysqlRequest::SharedCtor() {
+    _has_error = false;
+    _cached_size_ = 0;
+    _has_command = false;
+    _tx = NULL;
+    _stmt = NULL;
+    _param_index = 0;
+}
+
+MysqlRequest::~MysqlRequest() {
+    SharedDtor();
+    if (_stmt != NULL) {
+        delete _stmt;
+    }
+    _stmt = NULL;
+}
+
+void MysqlRequest::SharedDtor() {
+    if (this != default_instance_) {
+    }
+}
+
+void MysqlRequest::SetCachedSize(int size) const {
+    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
+    _cached_size_ = size;
+    GOOGLE_SAFE_CONCURRENT_WRITES_END();
+}
+const ::google::protobuf::Descriptor* MysqlRequest::descriptor() {
+    protobuf_AssignDescriptorsOnce();
+    return MysqlRequest_descriptor_;
+}
+
+const MysqlRequest& MysqlRequest::default_instance() {
+    if (default_instance_ == NULL) {
+        protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    }
+    return *default_instance_;
+}
+
+MysqlRequest* MysqlRequest::default_instance_ = NULL;
+
+MysqlRequest* MysqlRequest::New() const {
+    return new MysqlRequest;
+}
+
+void MysqlRequest::Clear() {
+    _has_error = false;
+    _buf.clear();
+    _has_command = false;
+    _tx = NULL;
+    _stmt = NULL;
+}
+
+bool 
MysqlRequest::MergePartialFromCodedStream(::google::protobuf::io::CodedInputStream*)
 {
+    LOG(WARNING) << "You're not supposed to parse a MysqlRequest";
+    return true;
+}
+
+void 
MysqlRequest::SerializeWithCachedSizes(::google::protobuf::io::CodedOutputStream*)
 const {
+    LOG(WARNING) << "You're not supposed to serialize a MysqlRequest";
+}
+
+::google::protobuf::uint8* MysqlRequest::SerializeWithCachedSizesToArray(
+    ::google::protobuf::uint8* target) const {
+    return target;
+}
+
+int MysqlRequest::ByteSize() const {
+    int total_size = _buf.size();
+    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
+    _cached_size_ = total_size;
+    GOOGLE_SAFE_CONCURRENT_WRITES_END();
+    return total_size;
+}
+
+void MysqlRequest::MergeFrom(const ::google::protobuf::Message& from) {
+    GOOGLE_CHECK_NE(&from, this);
+    const MysqlRequest* source =
+        ::google::protobuf::internal::dynamic_cast_if_available<const 
MysqlRequest*>(&from);
+    if (source == NULL) {
+        ::google::protobuf::internal::ReflectionOps::Merge(from, this);
+    } else {
+        MergeFrom(*source);
+    }
+}
+
+void MysqlRequest::MergeFrom(const MysqlRequest& from) {
+    // TODO: maybe need to optimize

Review Comment:
   这个没实现会不会有问题?至少报个错防止误用?



##########
src/brpc/socket.cpp:
##########
@@ -1512,7 +1515,7 @@ int Socket::Write(butil::IOBuf* data, const WriteOptions* 
options_in) {
     if (options_in) {
         opt = *options_in;
     }
-    if (data->empty()) {
+    if (data->empty() && !opt.with_auth) {

Review Comment:
   with_auth字段改成auth_flags了,类型也发生了变化



##########
src/brpc/mysql_command.h:
##########
@@ -0,0 +1,92 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yanglimin...@baidu.com)
+
+#ifndef BRPC_MYSQL_COMMAND_H
+#define BRPC_MYSQL_COMMAND_H
+
+#include <vector>
+#include "butil/iobuf.h"
+#include "butil/status.h"
+#include "brpc/mysql_common.h"
+
+namespace brpc {
+// mysql command types
+enum MysqlCommandType : unsigned char {
+    MYSQL_COM_SLEEP,
+    MYSQL_COM_QUIT,
+    MYSQL_COM_INIT_DB,
+    MYSQL_COM_QUERY,
+    MYSQL_COM_FIELD_LIST,
+    MYSQL_COM_CREATE_DB,
+    MYSQL_COM_DROP_DB,
+    MYSQL_COM_REFRESH,
+    MYSQL_COM_SHUTDOWN,
+    MYSQL_COM_STATISTICS,
+    MYSQL_COM_PROCESS_INFO,
+    MYSQL_COM_CONNECT,
+    MYSQL_COM_PROCESS_KILL,
+    MYSQL_COM_DEBUG,
+    MYSQL_COM_PING,
+    MYSQL_COM_TIME,
+    MYSQL_COM_DELAYED_INSERT,
+    MYSQL_COM_CHANGE_USER,
+    MYSQL_COM_BINLOG_DUMP,
+    MYSQL_COM_TABLE_DUMP,
+    MYSQL_COM_CONNECT_OUT,
+    MYSQL_COM_REGISTER_SLAVE,
+    MYSQL_COM_STMT_PREPARE,
+    MYSQL_COM_STMT_EXECUTE,
+    MYSQL_COM_STMT_SEND_LONG_DATA,
+    MYSQL_COM_STMT_CLOSE,
+    MYSQL_COM_STMT_RESET,
+    MYSQL_COM_SET_OPTION,
+    MYSQL_COM_STMT_FETCH,
+    MYSQL_COM_DAEMON,
+    MYSQL_COM_BINLOG_DUMP_GTID,
+    MYSQL_COM_RESET_CONNECTION,
+};
+
+butil::Status MysqlMakeCommand(butil::IOBuf* outbuf,
+                               const MysqlCommandType type,
+                               const butil::StringPiece& stmt);
+
+// Prepared Statement Protocol
+// an prepared statement has a unique statement id in one connection (in brpc 
SocketId), an prepared
+// statement can be executed in many connections, so ever connection has a 
different statement id.
+// In bprc, we can only get a connection in the stage of PackXXXRequest which 
is behind our building

Review Comment:
   bprc -> brpc



##########
example/mysql_c++/CMakeLists.txt:
##########
@@ -0,0 +1,148 @@
+cmake_minimum_required(VERSION 2.8.10)

Review Comment:
   加上license



##########
src/brpc/policy/mysql_authenticator.cpp:
##########
@@ -0,0 +1,176 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Author(s): Yang,Liming <yanglimin...@baidu.com>
+
+#include <vector>
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_auth_hash.h"
+#include "brpc/mysql_command.h"
+#include "brpc/mysql_reply.h"
+#include "brpc/mysql_common.h"
+#include "butil/base64.h"
+#include "butil/iobuf.h"
+#include "butil/logging.h"  // LOG()
+#include "butil/sys_byteorder.h"
+
+namespace brpc {
+namespace policy {
+
+namespace {
+const butil::StringPiece mysql_native_password("mysql_native_password");
+const char* auth_param_delim = "\t";
+bool MysqlHandleParams(const butil::StringPiece& params, std::string* 
param_cmd) {
+    if (params.empty()) {
+        return true;
+    }
+    const char* delim1 = "&";
+    std::vector<size_t> idx;
+    for (size_t p = params.find(delim1); p != butil::StringPiece::npos;

Review Comment:
   可以使用butil::StringSplitter或butil::SplitString



##########
docs/cn/mysql_client.md:
##########
@@ -0,0 +1,562 @@
+[MySQL](https://www.mysql.com/)是著名的开源的关系型数据库,为了使用户更快捷地访问mysql并充分利用bthread的并发能力,brpc直接支持mysql协议。示例程序:[example/mysql_c++](https://github.com/brpc/brpc/tree/master/example/mysql_c++/)
+
+**注意**:只支持MySQL 4.1 及之后的版本的文本协议,支持事务,不支持Prepared 
statement。目前支持的鉴权方式为mysql_native_password,使用事务的时候不支持single模式。
+
+相比使用[libmysqlclient](https://dev.mysql.com/downloads/connector/c/)(官方client)的优势有:
+
+- 线程安全。用户不需要为每个线程建立独立的client。
+- 支持同步、异步、半同步等访问方式,能使用[ParallelChannel等](combo_channel.md)组合访问方式。
+- 支持多种[连接方式](client.md#连接方式)。支持超时、backup request、取消、tracing、内置服务等一系列brpc提供的福利。
+- 明确的返回类型校验,如果使用了不正确的变量接受mysql的数据类型,将抛出异常。
+- 调用mysql标准库会阻塞框架的并发能力,使用本实现将能充分利用brpc框架的并发能力。
+- 使用brpc实现的mysql不会造成pthread的阻塞,使用libmysqlclient会阻塞pthread 
[线程相关](bthread.md),使用mysql的异步api会使编程变得很复杂。
+# 访问mysql
+
+创建一个访问mysql的Channel:
+
+```c++
+# include <brpc/mysql.h>
+# include <brpc/policy/mysql_authenticator.h>
+# include <brpc/channel.h>
+
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_MYSQL;
+options.connection_type = FLAGS_connection_type;
+options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+options.max_retry = FLAGS_max_retry;
+options.auth = new brpc::policy::MysqlAuthenticator("yangliming01", "123456", 
"test", 
+    "charset=utf8&collation_connection=utf8_unicode_ci");
+if (channel.Init("127.0.0.1", 3306, &options) != 0) {
+    LOG(ERROR) << "Fail to initialize channel";
+    return -1;
+}
+```
+
+向mysql发起命令。
+
+```c++
+// 执行各种mysql命令,可以批量执行命令如:"select * from tab1;select * from tab2"
+std::string command = "show databases"; // 
select,delete,update,insert,create,drop ...
+brpc::MysqlRequest request;
+if (!request.Query(command)) {
+    LOG(ERROR) << "Fail to add command";
+    return false;
+}
+brpc::MysqlResponse response;
+brpc::Controller cntl;
+channel.CallMethod(NULL, &cntl, &request, &response, NULL);
+if (!cntl.Failed()) {
+    std::cout << response << std::endl;
+} else {
+    LOG(ERROR) << "Fail to access mysql, " << cntl.ErrorText();
+    return false;
+}
+return true;
+```
+
+上述代码的说明:
+
+- 
请求类型必须为MysqlRequest,回复类型必须为MysqlResponse,否则CallMethod会失败。不需要stub,直接调用channel.CallMethod,method填NULL。
+- 调用request.Query()传入要执行的命令,可以批量执行命令,多个命令用分号隔开。
+- 
依次调用response.reply(X)弹出操作结果,根据返回类型的不同,选择不同的类型接收,如:MysqlReply::Ok,MysqlReply::Error,const
 MysqlReply::Columnconst MysqlReply::Row等。

Review Comment:
   Columnconst 中间少了个逗号



##########
src/brpc/policy/mysql_protocol.cpp:
##########
@@ -0,0 +1,416 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yanglimin...@baidu.com)
+
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+#include <gflags/gflags.h>
+#include <sstream>
+#include "butil/logging.h"  // LOG()
+#include "butil/time.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/sys_byteorder.h"
+#include "brpc/controller.h"  // Controller
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/socket.h"  // Socket
+#include "brpc/server.h"  // Server
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/span.h"
+#include "brpc/mysql.h"
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_protocol.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+DEFINE_bool(mysql_verbose, false, "[DEBUG] Print EVERY mysql 
request/response");
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,

Review Comment:
   这几个函数是不是可以在mysql_authenticator.h里声明一下



##########
example/mysql_c++/mysql_cli.cpp:
##########
@@ -0,0 +1,168 @@
+// Copyright (c) 2014 Baidu, Inc.

Review Comment:
   要改成Apache License header,下同



##########
src/brpc/policy/mysql_protocol.cpp:
##########
@@ -0,0 +1,416 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yanglimin...@baidu.com)
+
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+#include <gflags/gflags.h>
+#include <sstream>
+#include "butil/logging.h"  // LOG()
+#include "butil/time.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/sys_byteorder.h"
+#include "brpc/controller.h"  // Controller
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/socket.h"  // Socket
+#include "brpc/server.h"  // Server
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/span.h"
+#include "brpc/mysql.h"
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_protocol.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+DEFINE_bool(mysql_verbose, false, "[DEBUG] Print EVERY mysql 
request/response");
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,
+                             std::string* user,
+                             std::string* password,
+                             std::string* schema,
+                             std::string* collation);
+void MysqlParseParams(const butil::StringPiece& raw, std::string* params);
+// pack mysql authentication_data
+int MysqlPackAuthenticator(const MysqlReply::Auth& auth,
+                           const butil::StringPiece& user,
+                           const butil::StringPiece& password,
+                           const butil::StringPiece& schema,
+                           const butil::StringPiece& collation,
+                           std::string* auth_cmd);
+int MysqlPackParams(const butil::StringPiece& params, std::string* param_cmd);
+
+namespace {
+// I really don't want to add a variable in controller, so I use AuthContext 
group to mark auth
+// step.
+const char* auth_step[] = {"AUTH_OK", "PARAMS_OK"};
+
+struct InputResponse : public InputMessageBase {
+    bthread_id_t id_wait;
+    MysqlResponse response;
+
+    // @InputMessageBase
+    void DestroyImpl() {
+        delete this;
+    }
+};
+
+bool PackRequest(butil::IOBuf* buf,
+                 ControllerPrivateAccessor& accessor,
+                 const butil::IOBuf& request) {
+    if (accessor.pipelined_count() == MYSQL_PREPARED_STATEMENT) {
+        Socket* sock = accessor.get_sending_socket();
+        if (sock == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get sending socket with NULL";
+            return false;
+        }
+        auto stub = accessor.get_stmt();
+        if (stub == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get prepare statement with NULL";
+            return false;
+        }
+        uint32_t stmt_id;
+        // if can't found stmt_id in this socket, create prepared statement on 
it, store user
+        // request.
+        if ((stmt_id = stub->stmt()->StatementId(sock->id())) == 0) {
+            butil::IOBuf b;
+            butil::Status st = MysqlMakeCommand(&b, MYSQL_COM_STMT_PREPARE, 
stub->stmt()->str());
+            if (!st.ok()) {
+                LOG(ERROR) << "[MYSQL PACK] make prepare statement error " << 
st;
+                return false;
+            }
+            accessor.set_pipelined_count(MYSQL_NEED_PREPARE);
+            buf->append(b);
+            return true;
+        }
+        // else pack execute header with stmt_id
+        butil::Status st = stub->PackExecuteCommand(buf, stmt_id);
+        if (!st.ok()) {
+            LOG(ERROR) << "write execute data error " << st;
+            return false;
+        }
+        return true;
+    }
+    buf->append(request);
+    return true;
+}
+
+ParseError HandleAuthentication(const InputResponse* msg, const Socket* 
socket, PipelinedInfo* pi) {
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+
+    ParseError parseCode = PARSE_OK;
+    const AuthContext* ctx = socket->auth_context();
+    if (ctx == NULL) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] auth context is null";
+        goto END_OF_AUTH;
+    }
+    if (msg->response.reply(0).is_auth()) {
+        std::string user, password, schema, collation, auth_cmd;
+        const MysqlReply& reply = msg->response.reply(0);
+        MysqlParseAuthenticator(ctx->user(), &user, &password, &schema, 
&collation);
+        if (MysqlPackAuthenticator(reply.auth(), user, password, schema, 
collation, &auth_cmd) ==
+            0) {
+            butil::IOBuf buf;
+            buf.append(auth_cmd);
+            buf.cut_into_file_descriptor(socket->fd());
+            const_cast<AuthContext*>(ctx)->set_group(auth_step[0]);
+        } else {
+            parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+            LOG(ERROR) << "[MYSQL PARSE] wrong pack authentication data";
+        }
+    } else if (msg->response.reply_size() > 0) {
+        for (size_t i = 0; i < msg->response.reply_size(); ++i) {
+            if (!msg->response.reply(i).is_ok()) {
+                LOG(ERROR) << "[MYSQL PARSE] auth failed " << msg->response;
+                parseCode = PARSE_ERROR_NO_RESOURCE;
+                goto END_OF_AUTH;
+            }
+        }
+        std::string params, params_cmd;
+        MysqlParseParams(ctx->user(), &params);
+        if (ctx->group() == auth_step[0] && !params.empty()) {
+            if (MysqlPackParams(params, &params_cmd) == 0) {
+                butil::IOBuf buf;
+                buf.append(params_cmd);
+                buf.cut_into_file_descriptor(socket->fd());
+                const_cast<AuthContext*>(ctx)->set_group(auth_step[1]);
+            } else {
+                parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+                LOG(ERROR) << "[MYSQL PARSE] wrong pack params data";
+            }
+        } else {
+            butil::IOBuf raw_req;
+            raw_req.append(ctx->starter());
+            raw_req.cut_into_file_descriptor(socket->fd());
+            pi->with_auth = false;
+        }
+    } else {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] wrong authentication step";
+    }
+
+END_OF_AUTH:
+    if (bthread_id_unlock(cid) != 0) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] fail to unlock controller";
+    }
+    return parseCode;
+}
+
+ParseError HandlePrepareStatement(const InputResponse* msg,
+                                  const Socket* socket,
+                                  PipelinedInfo* pi) {
+    if (!msg->response.reply(0).is_prepare_ok()) {
+        LOG(ERROR) << "[MYSQL PARSE] response is not prepare ok, " << 
msg->response;
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    const MysqlReply::PrepareOk& ok = msg->response.reply(0).prepare_ok();
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    ParseError parseCode = PARSE_OK;
+    butil::IOBuf buf;
+    butil::Status st;
+    auto stub = ControllerPrivateAccessor(cntl).get_stmt();
+    auto stmt = stub->stmt();
+    if (stmt == NULL || stmt->param_count() != ok.param_count()) {
+        LOG(ERROR) << "[MYSQL PACK] stmt can't be NULL";
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    if (stmt->param_count() != ok.param_count()) {
+        LOG(ERROR) << "[MYSQL PACK] stmt param number " << stmt->param_count()
+                   << " not equal to prepareOk.param_number " << 
ok.param_count();
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    stmt->SetStatementId(socket->id(), ok.stmt_id());
+    st = stub->PackExecuteCommand(&buf, ok.stmt_id());
+    if (!st.ok()) {
+        LOG(ERROR) << "[MYSQL PACK] make execute header error " << st;
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    buf.cut_into_file_descriptor(socket->fd());
+    pi->count = MYSQL_PREPARED_STATEMENT;
+END_OF_PREPARE:
+    if (bthread_id_unlock(cid) != 0) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] fail to unlock controller";
+    }
+    return parseCode;
+}
+
+}  // namespace
+
+// "Message" = "Response" as we only implement the client for mysql.
+ParseResult ParseMysqlMessage(butil::IOBuf* source,
+                              Socket* socket,
+                              bool /*read_eof*/,
+                              const void* /*arg*/) {
+    if (source->empty()) {
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+
+    PipelinedInfo pi;
+    if (!socket->PopPipelinedInfo(&pi)) {
+        LOG(WARNING) << "No corresponding PipelinedInfo in socket";
+        return MakeParseError(PARSE_ERROR_TRY_OTHERS);
+    }
+
+    InputResponse* msg = 
static_cast<InputResponse*>(socket->parsing_context());
+    if (msg == NULL) {
+        msg = new InputResponse;
+        socket->reset_parsing_context(msg);
+    }
+
+    MysqlStmtType stmt_type = static_cast<MysqlStmtType>(pi.count);
+    ParseError err = msg->response.ConsumePartialIOBuf(*source, pi.with_auth, 
stmt_type);
+    if (FLAGS_mysql_verbose) {
+        LOG(INFO) << "[MYSQL PARSE] " << msg->response;
+    }
+    if (err != PARSE_OK) {
+        if (err == PARSE_ERROR_NOT_ENOUGH_DATA) {
+            socket->GivebackPipelinedInfo(pi);
+        }
+        return MakeParseError(err);
+    }
+    if (pi.with_auth) {
+        ParseError err = HandleAuthentication(msg, socket, &pi);
+        if (err != PARSE_OK) {
+            return MakeParseError(err, "Fail to authenticate with Mysql");
+        }
+        DestroyingPtr<InputResponse> auth_msg =
+            static_cast<InputResponse*>(socket->release_parsing_context());
+        socket->GivebackPipelinedInfo(pi);
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    if (stmt_type == MYSQL_NEED_PREPARE) {
+        // store stmt_id, make execute header.
+        ParseError err = HandlePrepareStatement(msg, socket, &pi);
+        if (err != PARSE_OK) {
+            return MakeParseError(err, "Fail to make parepared statement with 
Mysql");
+        }
+        DestroyingPtr<InputResponse> prepare_msg =
+            static_cast<InputResponse*>(socket->release_parsing_context());
+        socket->GivebackPipelinedInfo(pi);
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    msg->id_wait = pi.id_wait;
+    socket->release_parsing_context();
+    return MakeMessage(msg);
+}
+
+void ProcessMysqlResponse(InputMessageBase* msg_base) {
+    const int64_t start_parse_us = butil::cpuwide_time_us();
+    DestroyingPtr<InputResponse> msg(static_cast<InputResponse*>(msg_base));
+
+    const bthread_id_t cid = msg->id_wait;
+    Controller* cntl = NULL;
+    const int rc = bthread_id_lock(cid, (void**)&cntl);
+    if (rc != 0) {
+        LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
+            << "Fail to lock correlation_id=" << cid << ": " << berror(rc);
+        return;
+    }
+
+    ControllerPrivateAccessor accessor(cntl);
+    Span* span = accessor.span();
+    if (span) {
+        span->set_base_real_us(msg->base_real_us());
+        span->set_received_us(msg->received_us());
+        span->set_response_size(msg->response.ByteSize());
+        span->set_start_parse_us(start_parse_us);
+    }
+    const int saved_error = cntl->ErrorCode();
+    if (cntl->response() != NULL) {
+        if (cntl->response()->GetDescriptor() != MysqlResponse::descriptor()) {
+            cntl->SetFailed(ERESPONSE, "Must be MysqlResponse");
+        } else {
+            // We work around ParseFrom of pb which is just a placeholder.
+            ((MysqlResponse*)cntl->response())->Swap(&msg->response);
+        }
+    }  // silently ignore the response.
+
+    // Unlocks correlation_id inside. Revert controller's
+    // error code if it version check of `cid' fails
+    msg.reset();  // optional, just release resourse ASAP
+    accessor.OnResponse(cid, saved_error);
+}
+
+void SerializeMysqlRequest(butil::IOBuf* buf,
+                           Controller* cntl,
+                           const google::protobuf::Message* request) {
+    if (request == NULL) {
+        return cntl->SetFailed(EREQUEST, "request is NULL");
+    }
+    if (request->GetDescriptor() != MysqlRequest::descriptor()) {
+        return cntl->SetFailed(EREQUEST, "The request is not a MysqlRequest");
+    }
+    const MysqlRequest* rr = (const MysqlRequest*)request;
+    // We work around SerializeTo of pb which is just a placeholder.
+    if (!rr->SerializeTo(buf)) {
+        return cntl->SetFailed(EREQUEST, "Fail to serialize MysqlRequest");
+    }
+    // mysql protocol don't use pipelined count to verify the end of a 
response, so pipelined count
+    // is meanless, but we can use it help us to distinguish mysql reply type. 
In mysql protocol, we
+    // can't distinguish OK and PreparedOk, so we set pipelined count to 2 to 
let parse function to
+    // parse PreparedOk reply
+    ControllerPrivateAccessor accessor(cntl);
+    accessor.set_pipelined_count(MYSQL_NORMAL_STATEMENT);
+
+    auto tx = rr->get_tx();
+    if (tx != NULL) {
+        accessor.use_bind_sock(tx->GetSocketId());
+    }
+    auto st = rr->get_stmt();
+    if (st != NULL) {
+        accessor.set_stmt(st);
+        accessor.set_pipelined_count(MYSQL_PREPARED_STATEMENT);
+    }
+    if (FLAGS_mysql_verbose) {
+        LOG(INFO) << "\n[MYSQL REQUEST] " << *rr;
+    }
+}
+
+void PackMysqlRequest(butil::IOBuf* buf,
+                      SocketMessage**,
+                      uint64_t /*correlation_id*/,
+                      const google::protobuf::MethodDescriptor*,
+                      Controller* cntl,
+                      const butil::IOBuf& request,
+                      const Authenticator* auth) {
+    ControllerPrivateAccessor accessor(cntl);
+    if (auth) {
+        const MysqlAuthenticator* my_auth(dynamic_cast<const 
MysqlAuthenticator*>(auth));
+        if (my_auth == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] there is not MysqlAuthenticator";
+            return;
+        }
+        Socket* sock = accessor.get_sending_socket();
+        if (sock == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get sending socket with NULL";
+            return;
+        }
+        AuthContext* ctx = sock->mutable_auth_context();
+        // std::string params;

Review Comment:
   可以删了吧



##########
example/mysql_c++/mysql_go_press.go:
##########
@@ -0,0 +1,63 @@
+package main

Review Comment:
   加上license header



##########
src/brpc/policy/mysql_auth_hash.h:
##########
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2.0, as
+ * published by the Free Software Foundation.
+ *
+ * This program is also distributed with certain software (including
+ * but not limited to OpenSSL) that is licensed under separate terms,
+ * as designated in a particular file or component or in included license
+ * documentation.  The authors of MySQL hereby grant you an
+ * additional permission to link the program and your derivative works
+ * with the separately licensed software that they have included with
+ * MySQL.
+ *
+ * Without limiting anything contained in the foregoing, this file,
+ * which is part of MySQL Connector/C++, is also subject to the
+ * Universal FOSS Exception, version 1.0, a copy of which can be found at
+ * http://oss.oracle.com/licenses/universal-foss-exception.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License, version 2.0, for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software Foundation, 
Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#ifndef BRPC_POLICY_MYSQL_AUTH_HASH_H
+#define BRPC_POLICY_MYSQL_AUTH_HASH_H
+
+// #include <mysql/cdk/config.h>
+#include <string>
+
+namespace brpc {
+namespace policy {
+
+std::string mysql_build_mysql41_authentication_response(const std::string& 
salt_data,
+                                                        const std::string& 
password);
+
+std::string mysql_build_sha256_authentication_response(const std::string& 
salt_data,

Review Comment:
   这个哪里用到了



##########
src/brpc/mysql_reply.h:
##########
@@ -0,0 +1,801 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yanglimin...@baidu.com)
+
+#ifndef BRPC_MYSQL_REPLY_H
+#define BRPC_MYSQL_REPLY_H
+
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/arena.h"
+#include "butil/sys_byteorder.h"
+#include "butil/logging.h"  // LOG()
+#include "brpc/parse_result.h"
+#include "brpc/mysql_common.h"
+
+namespace brpc {
+
+class CheckParsed {
+public:
+    CheckParsed() : _is_parsed(false) {}
+    bool is_parsed() const {
+        return _is_parsed;
+    }
+    void set_parsed() {
+        _is_parsed = true;
+    }
+
+private:
+    bool _is_parsed;
+};
+
+enum MysqlRspType : uint8_t {
+    MYSQL_RSP_OK = 0x00,
+    MYSQL_RSP_ERROR = 0xFF,
+    MYSQL_RSP_RESULTSET = 0x01,
+    MYSQL_RSP_EOF = 0xFE,
+    MYSQL_RSP_AUTH = 0xFB,        // add for mysql auth
+    MYSQL_RSP_PREPARE_OK = 0xFC,  // add for prepared statement
+    MYSQL_RSP_UNKNOWN = 0xFD,     // add for other case
+};
+
+const char* MysqlRspTypeToString(MysqlRspType);
+
+class MysqlReply {
+public:
+    // Mysql Auth package
+    class Auth : private CheckParsed {
+    public:
+        Auth();
+        uint8_t protocol() const;
+        butil::StringPiece version() const;
+        uint32_t thread_id() const;
+        butil::StringPiece salt() const;
+        uint16_t capability() const;
+        uint8_t collation() const;
+        uint16_t status() const;
+        uint16_t extended_capability() const;
+        uint8_t auth_plugin_length() const;
+        butil::StringPiece salt2() const;
+        butil::StringPiece auth_plugin() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(Auth);
+        friend class MysqlReply;
+
+        uint8_t _protocol;
+        butil::StringPiece _version;
+        uint32_t _thread_id;
+        butil::StringPiece _salt;
+        uint16_t _capability;
+        uint8_t _collation;
+        uint16_t _status;
+        uint16_t _extended_capability;
+        uint8_t _auth_plugin_length;
+        butil::StringPiece _salt2;
+        butil::StringPiece _auth_plugin;
+    };
+    // Mysql Prepared Statement Ok
+    class Column;
+    // Mysql Eof package
+    class Eof : private CheckParsed {
+    public:
+        Eof();
+        uint16_t warning() const;
+        uint16_t status() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf);
+
+        DISALLOW_COPY_AND_ASSIGN(Eof);
+        friend class MysqlReply;
+
+        uint16_t _warning;
+        uint16_t _status;
+    };
+    // Mysql PrepareOk package
+    class PrepareOk : private CheckParsed {
+    public:
+        PrepareOk();
+        uint32_t stmt_id() const;
+        uint16_t column_count() const;
+        uint16_t param_count() const;
+        uint16_t warning() const;
+        const Column& param(uint16_t index) const;
+        const Column& column(uint16_t index) const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(PrepareOk);
+        friend class MysqlReply;
+
+        class Header : private CheckParsed {
+        public:
+            Header() : _stmt_id(0), _column_count(0), _param_count(0), 
_warning(0) {}
+            uint32_t _stmt_id;
+            uint16_t _column_count;
+            uint16_t _param_count;
+            uint16_t _warning;
+            ParseError Parse(butil::IOBuf& buf);
+        };
+        Header _header;
+        Column* _params;
+        Eof _eof1;
+        Column* _columns;
+        Eof _eof2;
+    };
+    // Mysql Ok package
+    class Ok : private CheckParsed {
+    public:
+        Ok();
+        uint64_t affect_row() const;
+        uint64_t index() const;
+        uint16_t status() const;
+        uint16_t warning() const;
+        butil::StringPiece msg() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(Ok);
+        friend class MysqlReply;
+
+        uint64_t _affect_row;
+        uint64_t _index;
+        uint16_t _status;
+        uint16_t _warning;
+        butil::StringPiece _msg;
+    };
+    // Mysql Error package
+    class Error : private CheckParsed {
+    public:
+        Error();
+        uint16_t errcode() const;
+        butil::StringPiece status() const;
+        butil::StringPiece msg() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(Error);
+        friend class MysqlReply;
+
+        uint16_t _errcode;
+        butil::StringPiece _status;
+        butil::StringPiece _msg;
+    };
+    // Mysql Column
+    class Column : private CheckParsed {
+    public:
+        Column();
+        butil::StringPiece catalog() const;
+        butil::StringPiece database() const;
+        butil::StringPiece table() const;
+        butil::StringPiece origin_table() const;
+        butil::StringPiece name() const;
+        butil::StringPiece origin_name() const;
+        uint16_t charset() const;
+        uint32_t length() const;
+        MysqlFieldType type() const;
+        MysqlFieldFlag flag() const;
+        uint8_t decimal() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(Column);
+        friend class MysqlReply;
+
+        butil::StringPiece _catalog;
+        butil::StringPiece _database;
+        butil::StringPiece _table;
+        butil::StringPiece _origin_table;
+        butil::StringPiece _name;
+        butil::StringPiece _origin_name;
+        uint16_t _charset;
+        uint32_t _length;
+        MysqlFieldType _type;
+        MysqlFieldFlag _flag;
+        uint8_t _decimal;
+    };
+    // Mysql Field
+    class Field : private CheckParsed {
+    public:
+        Field();
+        int8_t stiny() const;
+        uint8_t tiny() const;
+        int16_t ssmall() const;
+        uint16_t small() const;
+        int32_t sinteger() const;
+        uint32_t integer() const;
+        int64_t sbigint() const;
+        uint64_t bigint() const;
+        float float32() const;
+        double float64() const;
+        butil::StringPiece string() const;
+        bool is_stiny() const;
+        bool is_tiny() const;
+        bool is_ssmall() const;
+        bool is_small() const;
+        bool is_sinteger() const;
+        bool is_integer() const;
+        bool is_sbigint() const;
+        bool is_bigint() const;
+        bool is_float32() const;
+        bool is_float64() const;
+        bool is_string() const;
+        bool is_nil() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, const MysqlReply::Column* column, 
butil::Arena* arena);
+        ParseError Parse(butil::IOBuf& buf,
+                         const MysqlReply::Column* column,
+                         uint64_t column_index,
+                         uint64_t column_number,
+                         const uint8_t* null_mask,
+                         butil::Arena* arena);
+        ParseError ParseBinaryTime(butil::IOBuf& buf,
+                                   const MysqlReply::Column* column,
+                                   butil::StringPiece& str,
+                                   butil::Arena* arena);
+        ParseError ParseBinaryDataTime(butil::IOBuf& buf,
+                                       const MysqlReply::Column* column,
+                                       butil::StringPiece& str,
+                                       butil::Arena* arena);
+        ParseError ParseMicrosecs(butil::IOBuf& buf, uint8_t decimal, char* d);
+        DISALLOW_COPY_AND_ASSIGN(Field);
+        friend class MysqlReply;
+
+        union {
+            int8_t stiny;
+            uint8_t tiny;
+            int16_t ssmall;
+            uint16_t small;
+            int32_t sinteger;
+            uint32_t integer;
+            int64_t sbigint;
+            uint64_t bigint;
+            float float32;
+            double float64;
+            butil::StringPiece str;
+        } _data = {.str = NULL};
+        MysqlFieldType _type;
+        bool _unsigned;
+        bool _is_nil;
+    };
+    // Mysql Row
+    class Row : private CheckParsed {
+    public:
+        Row();
+        uint64_t field_count() const;
+        const Field& field(const uint64_t index) const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf,
+                         const Column* columns,
+                         uint64_t column_number,
+                         Field* fields,
+                         bool binary,
+                         butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(Row);
+        friend class MysqlReply;
+
+        Field* _fields;
+        uint64_t _field_count;
+        Row* _next;
+    };
+
+public:
+    MysqlReply();
+    ParseError ConsumePartialIOBuf(butil::IOBuf& buf,
+                                   butil::Arena* arena,
+                                   bool is_auth,
+                                   MysqlStmtType stmt_type,
+                                   bool* more_results);
+    void Swap(MysqlReply& other);
+    void Print(std::ostream& os) const;
+    // response type
+    MysqlRspType type() const;
+    // get auth
+    const Auth& auth() const;
+    const Ok& ok() const;
+    const PrepareOk& prepare_ok() const;
+    const Error& error() const;
+    const Eof& eof() const;
+    // get column number
+    uint64_t column_count() const;
+    // get one column
+    const Column& column(const uint64_t index) const;
+    // get row number
+    uint64_t row_count() const;
+    // get one row
+    const Row& next() const;
+    bool is_auth() const;
+    bool is_ok() const;
+    bool is_prepare_ok() const;
+    bool is_error() const;
+    bool is_eof() const;
+    bool is_resultset() const;
+
+private:
+    // Mysql result set header
+    struct ResultSetHeader : private CheckParsed {
+        ResultSetHeader() : _column_count(0), _extra_msg(0) {}
+        ParseError Parse(butil::IOBuf& buf);
+        uint64_t _column_count;
+        uint64_t _extra_msg;
+
+    private:
+        DISALLOW_COPY_AND_ASSIGN(ResultSetHeader);
+    };
+    // Mysql result set
+    struct ResultSet : private CheckParsed {
+        ResultSet() : _columns(NULL), _row_count(0) {
+            _cur = _first = _last = &_dummy;
+        }
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena, bool binary);
+        ResultSetHeader _header;
+        Column* _columns;
+        Eof _eof1;
+        // row list begin
+        Row* _first;
+        Row* _last;
+        Row* _cur;
+        uint64_t _row_count;
+        // row list end
+        Eof _eof2;
+
+    private:
+        DISALLOW_COPY_AND_ASSIGN(ResultSet);
+        Row _dummy;
+    };
+    // member values
+    MysqlRspType _type;
+    union {
+        Auth* auth;
+        ResultSet* result_set;
+        Ok* ok;
+        PrepareOk* prepare_ok;
+        Error* error;
+        Eof* eof;
+        uint64_t padding;  // For swapping, must cover all bytes.
+    } _data;
+
+    DISALLOW_COPY_AND_ASSIGN(MysqlReply);
+};
+
+// mysql reply
+inline MysqlReply::MysqlReply() {
+    _type = MYSQL_RSP_UNKNOWN;
+    _data.padding = 0;
+}
+inline void MysqlReply::Swap(MysqlReply& other) {

Review Comment:
   加个空行,下同



##########
src/brpc/details/controller_private_accessor.h:
##########
@@ -152,6 +152,22 @@ class ControllerPrivateAccessor {
         return *this;
     }
 
+    // Set bind socket action
+    void set_bind_sock_action(BindSockAction action) { 
_cntl->_bind_sock_action = action; }
+    // Transfer ownership to other
+    void get_bind_sock(SocketUniquePtr* ptr) {
+        _cntl->_bind_sock->ReAddress(ptr);
+    }
+    // Use a external socket
+    void use_bind_sock(SocketId sock_id) { 
+        _cntl->_bind_sock_action = BIND_SOCK_USE;
+        Socket::Address(sock_id, &_cntl->_bind_sock);
+    }
+    // set prepare statement
+    void set_stmt(MysqlStatementStub *stmt) { _cntl->_stmt = stmt; }
+    // get prepare statement
+    MysqlStatementStub* get_stmt() { return _cntl->_stmt; }

Review Comment:
   mysql_stmt()



##########
src/brpc/details/controller_private_accessor.h:
##########
@@ -152,6 +152,22 @@ class ControllerPrivateAccessor {
         return *this;
     }
 
+    // Set bind socket action
+    void set_bind_sock_action(BindSockAction action) { 
_cntl->_bind_sock_action = action; }
+    // Transfer ownership to other
+    void get_bind_sock(SocketUniquePtr* ptr) {
+        _cntl->_bind_sock->ReAddress(ptr);
+    }
+    // Use a external socket
+    void use_bind_sock(SocketId sock_id) { 
+        _cntl->_bind_sock_action = BIND_SOCK_USE;
+        Socket::Address(sock_id, &_cntl->_bind_sock);
+    }
+    // set prepare statement
+    void set_stmt(MysqlStatementStub *stmt) { _cntl->_stmt = stmt; }

Review Comment:
   set_mysql_stmt()



##########
src/brpc/mysql.h:
##########
@@ -0,0 +1,286 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yanglimin...@baidu.com)
+
+#ifndef BRPC_MYSQL_H
+#define BRPC_MYSQL_H
+
+#include <string>
+#include <vector>
+#include <google/protobuf/stubs/common.h>
+
+#include <google/protobuf/generated_message_util.h>
+#include <google/protobuf/repeated_field.h>
+#include <google/protobuf/extension_set.h>
+#include <google/protobuf/generated_message_reflection.h>
+#include "google/protobuf/descriptor.pb.h"
+
+#include "butil/iobuf.h"
+#include "butil/strings/string_piece.h"
+#include "butil/arena.h"
+#include "parse_result.h"
+#include "mysql_command.h"
+#include "mysql_reply.h"
+#include "mysql_transaction.h"
+#include "mysql_statement.h"
+
+namespace brpc {
+// Request to mysql.
+// Notice that you can pipeline multiple commands in one request and sent
+// them to ONE mysql-server together.
+// Example:
+//   MysqlRequest request;
+//   request.Query("select * from table");
+//   MysqlResponse response;
+//   channel.CallMethod(NULL, &controller, &request, &response, NULL/*done*/);
+//   if (!cntl.Failed()) {
+//       LOG(INFO) << response.reply(0);
+//   }
+
+class MysqlStatementStub {
+public:
+    MysqlStatementStub(MysqlStatement* stmt);
+    MysqlStatement* stmt();
+    butil::IOBuf& execute_data();
+    butil::Status PackExecuteCommand(butil::IOBuf* outbuf, uint32_t stmt_id);
+    // prepare statement null mask
+    struct NullMask {
+        NullMask() : area(butil::IOBuf::INVALID_AREA) {}
+        std::vector<uint8_t> mask;
+        butil::IOBuf::Area area;
+    };
+    // prepare statement param types
+    struct ParamTypes {
+        ParamTypes() : area(butil::IOBuf::INVALID_AREA) {}
+        std::vector<uint8_t> types;
+        butil::IOBuf::Area area;
+    };
+    // null mask and param types
+    NullMask& null_mask();
+    ParamTypes& param_types();
+    // save long data
+    void save_long_data(uint16_t param_id, const butil::StringPiece& value);
+
+private:
+    MysqlStatement* _stmt;
+    butil::IOBuf _execute_data;
+    NullMask _null_mask;
+    ParamTypes _param_types;
+    // long data
+    struct LongData {
+        uint16_t param_id;
+        butil::IOBuf long_data;
+    };
+    std::vector<LongData> _long_data;
+};
+
+inline MysqlStatementStub::MysqlStatementStub(MysqlStatement* stmt) : 
_stmt(stmt) {}
+
+inline MysqlStatement* MysqlStatementStub::stmt() {
+    return _stmt;
+}
+
+inline butil::IOBuf& MysqlStatementStub::execute_data() {
+    return _execute_data;
+}
+
+inline MysqlStatementStub::NullMask& MysqlStatementStub::null_mask() {
+    return _null_mask;
+}
+
+inline MysqlStatementStub::ParamTypes& MysqlStatementStub::param_types() {
+    return _param_types;
+}
+
+inline void MysqlStatementStub::save_long_data(uint16_t param_id, const 
butil::StringPiece& value) {
+    LongData d;
+    d.param_id = param_id;
+    d.long_data.append(value.data(), value.size());
+    _long_data.push_back(d);
+}
+
+class MysqlRequest : public ::google::protobuf::Message {
+public:
+    MysqlRequest();
+    MysqlRequest(const MysqlTransaction* tx);
+    MysqlRequest(MysqlStatement* stmt);
+    MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt);
+    virtual ~MysqlRequest();
+    MysqlRequest(const MysqlRequest& from);
+    inline MysqlRequest& operator=(const MysqlRequest& from) {
+        CopyFrom(from);
+        return *this;
+    }
+    void Swap(MysqlRequest* other);
+
+    // Serialize the request into `buf'. Return true on success.
+    bool SerializeTo(butil::IOBuf* buf) const;
+
+    // Protobuf methods.
+    MysqlRequest* New() const;

Review Comment:
   对于protobuf 3.6以上,需要添加一个New(::google::protobuf::Arena* arena)的重载



##########
src/brpc/mysql.cpp:
##########
@@ -0,0 +1,695 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yanglimin...@baidu.com)
+
+#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION
+#include <algorithm>
+#include <gflags/gflags.h>
+#include <google/protobuf/stubs/once.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/wire_format_lite_inl.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/reflection_ops.h>
+#include <google/protobuf/wire_format.h>
+#include "butil/string_printf.h"
+#include "butil/macros.h"
+#include "brpc/controller.h"
+#include "brpc/mysql.h"
+#include "brpc/mysql_common.h"
+
+namespace brpc {
+
+DEFINE_int32(mysql_multi_replies_size, 10, "multi replies size in one 
MysqlResponse");
+
+// Internal implementation detail -- do not call these.
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl();
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+void protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+void protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto();
+
+namespace {
+
+const ::google::protobuf::Descriptor* MysqlRequest_descriptor_ = NULL;
+const ::google::protobuf::Descriptor* MysqlResponse_descriptor_ = NULL;
+
+}  // namespace
+
+void protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    const ::google::protobuf::FileDescriptor* file =
+        ::google::protobuf::DescriptorPool::generated_pool()->FindFileByName(
+            "baidu/rpc/mysql_base.proto");
+    GOOGLE_CHECK(file != NULL);
+    MysqlRequest_descriptor_ = file->message_type(0);
+    MysqlResponse_descriptor_ = file->message_type(1);
+}
+
+namespace {
+
+GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AssignDescriptors_once_);
+inline void protobuf_AssignDescriptorsOnce() {
+    ::google::protobuf::GoogleOnceInit(&protobuf_AssignDescriptors_once_,
+                                       
&protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto);
+}
+
+void protobuf_RegisterTypes(const ::std::string&) {
+    protobuf_AssignDescriptorsOnce();
+    ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
+        MysqlRequest_descriptor_, &MysqlRequest::default_instance());
+    ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
+        MysqlResponse_descriptor_, &MysqlResponse::default_instance());
+}
+
+}  // namespace
+
+void protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    delete MysqlRequest::default_instance_;
+    delete MysqlResponse::default_instance_;
+}
+
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl() {
+    GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+#if GOOGLE_PROTOBUF_VERSION >= 3002000
+    ::google::protobuf::internal::InitProtobufDefaults();
+#else
+    
::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
+#endif
+    ::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
+        "\n\032baidu/rpc/mysql_base.proto\022\tbaidu.rpc\032"
+        " google/protobuf/descriptor.proto\"\016\n\014Mys"
+        "qlRequest\"\017\n\rMysqlResponseB\003\200\001\001",
+        111);
+    
::google::protobuf::MessageFactory::InternalRegisterGeneratedFile("baidu/rpc/mysql_base.proto",
+                                                                      
&protobuf_RegisterTypes);
+    MysqlRequest::default_instance_ = new MysqlRequest();
+    MysqlResponse::default_instance_ = new MysqlResponse();
+    MysqlRequest::default_instance_->InitAsDefaultInstance();
+    MysqlResponse::default_instance_->InitAsDefaultInstance();
+    ::google::protobuf::internal::OnShutdown(
+        &protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto);
+}
+
+GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_once);
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    
::google::protobuf::GoogleOnceInit(&protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_once,
+                                       
&protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl);
+}
+
+// Force AddDescriptors() to be called at static initialization time.
+struct StaticDescriptorInitializer_baidu_2frpc_2fmysql_5fbase_2eproto {
+    StaticDescriptorInitializer_baidu_2frpc_2fmysql_5fbase_2eproto() {
+        protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    }
+} static_descriptor_initializer_baidu_2frpc_2fmysql_5fbase_2eproto_;
+
+
+// ===================================================================
+
+#ifndef _MSC_VER
+#endif  // !_MSC_VER
+
+butil::Status MysqlStatementStub::PackExecuteCommand(butil::IOBuf* outbuf, 
uint32_t stmt_id) {
+    butil::Status st;
+    // long data
+    for (const auto& i : _long_data) {
+        st = MysqlMakeLongDataPacket(outbuf, stmt_id, i.param_id, i.long_data);
+        if (!st.ok()) {
+            LOG(ERROR) << "make long data header error " << st;
+            return st;
+        }
+    }
+    _long_data.clear();
+    // execute data
+    st = MysqlMakeExecutePacket(outbuf, stmt_id, _execute_data);
+    if (!st.ok()) {
+        LOG(ERROR) << "make execute header error " << st;
+        return st;
+    }
+    _execute_data.clear();
+    _null_mask.mask.clear();
+    _null_mask.area = butil::IOBuf::INVALID_AREA;
+    _param_types.types.clear();
+    _param_types.area = butil::IOBuf::INVALID_AREA;
+
+    return st;
+}
+
+MysqlRequest::MysqlRequest() : ::google::protobuf::Message() {
+    SharedCtor();
+}
+
+MysqlRequest::MysqlRequest(const MysqlTransaction* tx) : 
::google::protobuf::Message() {
+    SharedCtor();
+    _tx = tx;
+}
+
+MysqlRequest::MysqlRequest(MysqlStatement* stmt) : 
::google::protobuf::Message() {
+    SharedCtor();
+    _stmt = new MysqlStatementStub(stmt);
+}
+
+MysqlRequest::MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt)
+    : ::google::protobuf::Message() {
+    SharedCtor();
+    _tx = tx;
+    _stmt = new MysqlStatementStub(stmt);
+}
+
+void MysqlRequest::InitAsDefaultInstance() {}
+
+MysqlRequest::MysqlRequest(const MysqlRequest& from) : 
::google::protobuf::Message() {
+    SharedCtor();
+    MergeFrom(from);
+}
+
+void MysqlRequest::SharedCtor() {
+    _has_error = false;
+    _cached_size_ = 0;
+    _has_command = false;
+    _tx = NULL;
+    _stmt = NULL;
+    _param_index = 0;
+}
+
+MysqlRequest::~MysqlRequest() {
+    SharedDtor();
+    if (_stmt != NULL) {
+        delete _stmt;
+    }
+    _stmt = NULL;
+}
+
+void MysqlRequest::SharedDtor() {
+    if (this != default_instance_) {
+    }
+}
+
+void MysqlRequest::SetCachedSize(int size) const {
+    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
+    _cached_size_ = size;
+    GOOGLE_SAFE_CONCURRENT_WRITES_END();
+}
+const ::google::protobuf::Descriptor* MysqlRequest::descriptor() {
+    protobuf_AssignDescriptorsOnce();
+    return MysqlRequest_descriptor_;
+}
+
+const MysqlRequest& MysqlRequest::default_instance() {
+    if (default_instance_ == NULL) {
+        protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    }
+    return *default_instance_;
+}
+
+MysqlRequest* MysqlRequest::default_instance_ = NULL;
+
+MysqlRequest* MysqlRequest::New() const {
+    return new MysqlRequest;
+}
+
+void MysqlRequest::Clear() {
+    _has_error = false;
+    _buf.clear();
+    _has_command = false;
+    _tx = NULL;
+    _stmt = NULL;
+}
+
+bool 
MysqlRequest::MergePartialFromCodedStream(::google::protobuf::io::CodedInputStream*)
 {
+    LOG(WARNING) << "You're not supposed to parse a MysqlRequest";
+    return true;
+}
+
+void 
MysqlRequest::SerializeWithCachedSizes(::google::protobuf::io::CodedOutputStream*)
 const {
+    LOG(WARNING) << "You're not supposed to serialize a MysqlRequest";
+}
+
+::google::protobuf::uint8* MysqlRequest::SerializeWithCachedSizesToArray(
+    ::google::protobuf::uint8* target) const {
+    return target;
+}
+
+int MysqlRequest::ByteSize() const {
+    int total_size = _buf.size();
+    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
+    _cached_size_ = total_size;
+    GOOGLE_SAFE_CONCURRENT_WRITES_END();
+    return total_size;
+}
+
+void MysqlRequest::MergeFrom(const ::google::protobuf::Message& from) {
+    GOOGLE_CHECK_NE(&from, this);
+    const MysqlRequest* source =
+        ::google::protobuf::internal::dynamic_cast_if_available<const 
MysqlRequest*>(&from);
+    if (source == NULL) {
+        ::google::protobuf::internal::ReflectionOps::Merge(from, this);
+    } else {
+        MergeFrom(*source);
+    }
+}
+
+void MysqlRequest::MergeFrom(const MysqlRequest& from) {
+    // TODO: maybe need to optimize
+    // GOOGLE_CHECK_NE(&from, this);
+    // const int header_size = 4;
+    // const uint32_t size_l = from._buf.size() - header_size - 1;  // payload 
- type
+    // const uint32_t size_r = _buf.size() - header_size + 1;       // payload 
+ seqno
+    // const uint32_t payload_size = butil::ByteSwapToLE32(size_l + size_r);
+    // if (payload_size > mysql_max_package_size) {
+    //     CHECK(false)
+    //         << "[MysqlRequest::MergeFrom] statement size is too big, merge 
from do nothing";
+    //     return;
+    // }
+    // butil::IOBuf buf;
+    // butil::IOBuf result;
+    // _has_error = _has_error || from._has_error;
+    // buf.append(from._buf);
+    // buf.pop_front(header_size + 1);
+    // _buf.pop_front(header_size - 1);
+    // result.append(&payload_size, 3);
+    // result.append(_buf);
+    // result.append(buf);
+    // _buf = result;
+    // _has_command = _has_command || from._has_command;
+}
+
+void MysqlRequest::CopyFrom(const ::google::protobuf::Message& from) {
+    if (&from == this)
+        return;
+    Clear();
+    MergeFrom(from);
+}
+
+void MysqlRequest::CopyFrom(const MysqlRequest& from) {
+    if (&from == this)
+        return;
+    Clear();
+    MergeFrom(from);
+}
+
+void MysqlRequest::Swap(MysqlRequest* other) {
+    if (other != this) {
+        _buf.swap(other->_buf);
+        std::swap(_has_error, other->_has_error);
+        std::swap(_cached_size_, other->_cached_size_);
+        std::swap(_has_command, other->_has_command);
+    }
+}
+
+bool MysqlRequest::SerializeTo(butil::IOBuf* buf) const {
+    if (_has_error) {
+        LOG(ERROR) << "Reject serialization due to error in CommandXXX[V]";
+        return false;
+    }
+    *buf = _buf;
+    return true;
+}
+
+::google::protobuf::Metadata MysqlRequest::GetMetadata() const {
+    protobuf_AssignDescriptorsOnce();
+    ::google::protobuf::Metadata metadata;
+    metadata.descriptor = MysqlRequest_descriptor_;
+    metadata.reflection = NULL;
+    return metadata;
+}
+
+bool MysqlRequest::Query(const butil::StringPiece& command) {
+    if (_has_error) {
+        return false;
+    }
+
+    if (_has_command) {
+        return false;
+    }
+
+    const butil::Status st = MysqlMakeCommand(&_buf, MYSQL_COM_QUERY, command);
+    if (st.ok()) {
+        _has_command = true;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+
+bool MysqlRequest::AddParam(int8_t p) {
+    if (_has_error) {
+        return false;
+    }
+    const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, 
MYSQL_FIELD_TYPE_TINY);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(uint8_t p) {

Review Comment:
   加个空行吧,下同



##########
src/brpc/policy/mysql_protocol.cpp:
##########
@@ -0,0 +1,416 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yanglimin...@baidu.com)
+
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+#include <gflags/gflags.h>
+#include <sstream>
+#include "butil/logging.h"  // LOG()
+#include "butil/time.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/sys_byteorder.h"
+#include "brpc/controller.h"  // Controller
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/socket.h"  // Socket
+#include "brpc/server.h"  // Server
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/span.h"
+#include "brpc/mysql.h"
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_protocol.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+DEFINE_bool(mysql_verbose, false, "[DEBUG] Print EVERY mysql 
request/response");
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,
+                             std::string* user,
+                             std::string* password,
+                             std::string* schema,
+                             std::string* collation);
+void MysqlParseParams(const butil::StringPiece& raw, std::string* params);
+// pack mysql authentication_data
+int MysqlPackAuthenticator(const MysqlReply::Auth& auth,
+                           const butil::StringPiece& user,
+                           const butil::StringPiece& password,
+                           const butil::StringPiece& schema,
+                           const butil::StringPiece& collation,
+                           std::string* auth_cmd);
+int MysqlPackParams(const butil::StringPiece& params, std::string* param_cmd);
+
+namespace {
+// I really don't want to add a variable in controller, so I use AuthContext 
group to mark auth
+// step.
+const char* auth_step[] = {"AUTH_OK", "PARAMS_OK"};
+
+struct InputResponse : public InputMessageBase {
+    bthread_id_t id_wait;
+    MysqlResponse response;
+
+    // @InputMessageBase
+    void DestroyImpl() {
+        delete this;
+    }
+};
+
+bool PackRequest(butil::IOBuf* buf,
+                 ControllerPrivateAccessor& accessor,
+                 const butil::IOBuf& request) {
+    if (accessor.pipelined_count() == MYSQL_PREPARED_STATEMENT) {
+        Socket* sock = accessor.get_sending_socket();
+        if (sock == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get sending socket with NULL";
+            return false;
+        }
+        auto stub = accessor.get_stmt();
+        if (stub == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get prepare statement with NULL";
+            return false;
+        }
+        uint32_t stmt_id;
+        // if can't found stmt_id in this socket, create prepared statement on 
it, store user
+        // request.
+        if ((stmt_id = stub->stmt()->StatementId(sock->id())) == 0) {
+            butil::IOBuf b;
+            butil::Status st = MysqlMakeCommand(&b, MYSQL_COM_STMT_PREPARE, 
stub->stmt()->str());
+            if (!st.ok()) {
+                LOG(ERROR) << "[MYSQL PACK] make prepare statement error " << 
st;
+                return false;
+            }
+            accessor.set_pipelined_count(MYSQL_NEED_PREPARE);
+            buf->append(b);
+            return true;
+        }
+        // else pack execute header with stmt_id
+        butil::Status st = stub->PackExecuteCommand(buf, stmt_id);
+        if (!st.ok()) {
+            LOG(ERROR) << "write execute data error " << st;
+            return false;
+        }
+        return true;
+    }
+    buf->append(request);
+    return true;
+}
+
+ParseError HandleAuthentication(const InputResponse* msg, const Socket* 
socket, PipelinedInfo* pi) {
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+
+    ParseError parseCode = PARSE_OK;
+    const AuthContext* ctx = socket->auth_context();
+    if (ctx == NULL) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] auth context is null";
+        goto END_OF_AUTH;
+    }
+    if (msg->response.reply(0).is_auth()) {
+        std::string user, password, schema, collation, auth_cmd;
+        const MysqlReply& reply = msg->response.reply(0);
+        MysqlParseAuthenticator(ctx->user(), &user, &password, &schema, 
&collation);
+        if (MysqlPackAuthenticator(reply.auth(), user, password, schema, 
collation, &auth_cmd) ==
+            0) {
+            butil::IOBuf buf;
+            buf.append(auth_cmd);
+            buf.cut_into_file_descriptor(socket->fd());
+            const_cast<AuthContext*>(ctx)->set_group(auth_step[0]);
+        } else {
+            parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+            LOG(ERROR) << "[MYSQL PARSE] wrong pack authentication data";
+        }
+    } else if (msg->response.reply_size() > 0) {
+        for (size_t i = 0; i < msg->response.reply_size(); ++i) {
+            if (!msg->response.reply(i).is_ok()) {
+                LOG(ERROR) << "[MYSQL PARSE] auth failed " << msg->response;
+                parseCode = PARSE_ERROR_NO_RESOURCE;
+                goto END_OF_AUTH;
+            }
+        }
+        std::string params, params_cmd;
+        MysqlParseParams(ctx->user(), &params);
+        if (ctx->group() == auth_step[0] && !params.empty()) {
+            if (MysqlPackParams(params, &params_cmd) == 0) {
+                butil::IOBuf buf;
+                buf.append(params_cmd);
+                buf.cut_into_file_descriptor(socket->fd());
+                const_cast<AuthContext*>(ctx)->set_group(auth_step[1]);
+            } else {
+                parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+                LOG(ERROR) << "[MYSQL PARSE] wrong pack params data";
+            }
+        } else {
+            butil::IOBuf raw_req;
+            raw_req.append(ctx->starter());
+            raw_req.cut_into_file_descriptor(socket->fd());
+            pi->with_auth = false;
+        }
+    } else {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] wrong authentication step";
+    }
+
+END_OF_AUTH:
+    if (bthread_id_unlock(cid) != 0) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] fail to unlock controller";
+    }
+    return parseCode;
+}
+
+ParseError HandlePrepareStatement(const InputResponse* msg,
+                                  const Socket* socket,
+                                  PipelinedInfo* pi) {
+    if (!msg->response.reply(0).is_prepare_ok()) {
+        LOG(ERROR) << "[MYSQL PARSE] response is not prepare ok, " << 
msg->response;
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    const MysqlReply::PrepareOk& ok = msg->response.reply(0).prepare_ok();
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    ParseError parseCode = PARSE_OK;
+    butil::IOBuf buf;
+    butil::Status st;
+    auto stub = ControllerPrivateAccessor(cntl).get_stmt();
+    auto stmt = stub->stmt();
+    if (stmt == NULL || stmt->param_count() != ok.param_count()) {

Review Comment:
   下面已经有stmt->param_count() != ok.param_count()
   这里不用重复判断



##########
src/brpc/policy/mysql_protocol.cpp:
##########
@@ -0,0 +1,416 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yanglimin...@baidu.com)
+
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+#include <gflags/gflags.h>
+#include <sstream>
+#include "butil/logging.h"  // LOG()
+#include "butil/time.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/sys_byteorder.h"
+#include "brpc/controller.h"  // Controller
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/socket.h"  // Socket
+#include "brpc/server.h"  // Server
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/span.h"
+#include "brpc/mysql.h"
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_protocol.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+DEFINE_bool(mysql_verbose, false, "[DEBUG] Print EVERY mysql 
request/response");
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,
+                             std::string* user,
+                             std::string* password,
+                             std::string* schema,
+                             std::string* collation);
+void MysqlParseParams(const butil::StringPiece& raw, std::string* params);
+// pack mysql authentication_data
+int MysqlPackAuthenticator(const MysqlReply::Auth& auth,
+                           const butil::StringPiece& user,
+                           const butil::StringPiece& password,
+                           const butil::StringPiece& schema,
+                           const butil::StringPiece& collation,
+                           std::string* auth_cmd);
+int MysqlPackParams(const butil::StringPiece& params, std::string* param_cmd);
+
+namespace {
+// I really don't want to add a variable in controller, so I use AuthContext 
group to mark auth
+// step.
+const char* auth_step[] = {"AUTH_OK", "PARAMS_OK"};
+
+struct InputResponse : public InputMessageBase {
+    bthread_id_t id_wait;
+    MysqlResponse response;
+
+    // @InputMessageBase
+    void DestroyImpl() {
+        delete this;
+    }
+};
+
+bool PackRequest(butil::IOBuf* buf,
+                 ControllerPrivateAccessor& accessor,
+                 const butil::IOBuf& request) {
+    if (accessor.pipelined_count() == MYSQL_PREPARED_STATEMENT) {
+        Socket* sock = accessor.get_sending_socket();
+        if (sock == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get sending socket with NULL";
+            return false;
+        }
+        auto stub = accessor.get_stmt();
+        if (stub == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get prepare statement with NULL";
+            return false;
+        }
+        uint32_t stmt_id;
+        // if can't found stmt_id in this socket, create prepared statement on 
it, store user
+        // request.
+        if ((stmt_id = stub->stmt()->StatementId(sock->id())) == 0) {
+            butil::IOBuf b;
+            butil::Status st = MysqlMakeCommand(&b, MYSQL_COM_STMT_PREPARE, 
stub->stmt()->str());
+            if (!st.ok()) {
+                LOG(ERROR) << "[MYSQL PACK] make prepare statement error " << 
st;
+                return false;
+            }
+            accessor.set_pipelined_count(MYSQL_NEED_PREPARE);
+            buf->append(b);
+            return true;
+        }
+        // else pack execute header with stmt_id
+        butil::Status st = stub->PackExecuteCommand(buf, stmt_id);
+        if (!st.ok()) {
+            LOG(ERROR) << "write execute data error " << st;
+            return false;
+        }
+        return true;
+    }
+    buf->append(request);
+    return true;
+}
+
+ParseError HandleAuthentication(const InputResponse* msg, const Socket* 
socket, PipelinedInfo* pi) {
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+
+    ParseError parseCode = PARSE_OK;
+    const AuthContext* ctx = socket->auth_context();
+    if (ctx == NULL) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] auth context is null";
+        goto END_OF_AUTH;
+    }
+    if (msg->response.reply(0).is_auth()) {
+        std::string user, password, schema, collation, auth_cmd;
+        const MysqlReply& reply = msg->response.reply(0);
+        MysqlParseAuthenticator(ctx->user(), &user, &password, &schema, 
&collation);
+        if (MysqlPackAuthenticator(reply.auth(), user, password, schema, 
collation, &auth_cmd) ==
+            0) {
+            butil::IOBuf buf;
+            buf.append(auth_cmd);
+            buf.cut_into_file_descriptor(socket->fd());

Review Comment:
   没有判断返回值,可能存在写入失败的可能性
   另外这里用Socket::Write是不是更合适?



##########
src/brpc/policy/mysql_protocol.cpp:
##########
@@ -0,0 +1,416 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yanglimin...@baidu.com)
+
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+#include <gflags/gflags.h>
+#include <sstream>
+#include "butil/logging.h"  // LOG()
+#include "butil/time.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/sys_byteorder.h"
+#include "brpc/controller.h"  // Controller
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/socket.h"  // Socket
+#include "brpc/server.h"  // Server
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/span.h"
+#include "brpc/mysql.h"
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_protocol.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+DEFINE_bool(mysql_verbose, false, "[DEBUG] Print EVERY mysql 
request/response");
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,
+                             std::string* user,
+                             std::string* password,
+                             std::string* schema,
+                             std::string* collation);
+void MysqlParseParams(const butil::StringPiece& raw, std::string* params);
+// pack mysql authentication_data
+int MysqlPackAuthenticator(const MysqlReply::Auth& auth,
+                           const butil::StringPiece& user,
+                           const butil::StringPiece& password,
+                           const butil::StringPiece& schema,
+                           const butil::StringPiece& collation,
+                           std::string* auth_cmd);
+int MysqlPackParams(const butil::StringPiece& params, std::string* param_cmd);
+
+namespace {
+// I really don't want to add a variable in controller, so I use AuthContext 
group to mark auth
+// step.
+const char* auth_step[] = {"AUTH_OK", "PARAMS_OK"};
+
+struct InputResponse : public InputMessageBase {
+    bthread_id_t id_wait;
+    MysqlResponse response;
+
+    // @InputMessageBase
+    void DestroyImpl() {
+        delete this;
+    }
+};
+
+bool PackRequest(butil::IOBuf* buf,
+                 ControllerPrivateAccessor& accessor,
+                 const butil::IOBuf& request) {
+    if (accessor.pipelined_count() == MYSQL_PREPARED_STATEMENT) {
+        Socket* sock = accessor.get_sending_socket();
+        if (sock == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get sending socket with NULL";
+            return false;
+        }
+        auto stub = accessor.get_stmt();
+        if (stub == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get prepare statement with NULL";
+            return false;
+        }
+        uint32_t stmt_id;
+        // if can't found stmt_id in this socket, create prepared statement on 
it, store user
+        // request.
+        if ((stmt_id = stub->stmt()->StatementId(sock->id())) == 0) {
+            butil::IOBuf b;
+            butil::Status st = MysqlMakeCommand(&b, MYSQL_COM_STMT_PREPARE, 
stub->stmt()->str());
+            if (!st.ok()) {
+                LOG(ERROR) << "[MYSQL PACK] make prepare statement error " << 
st;
+                return false;
+            }
+            accessor.set_pipelined_count(MYSQL_NEED_PREPARE);
+            buf->append(b);
+            return true;
+        }
+        // else pack execute header with stmt_id
+        butil::Status st = stub->PackExecuteCommand(buf, stmt_id);
+        if (!st.ok()) {
+            LOG(ERROR) << "write execute data error " << st;
+            return false;
+        }
+        return true;
+    }
+    buf->append(request);
+    return true;
+}
+
+ParseError HandleAuthentication(const InputResponse* msg, const Socket* 
socket, PipelinedInfo* pi) {
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+
+    ParseError parseCode = PARSE_OK;
+    const AuthContext* ctx = socket->auth_context();
+    if (ctx == NULL) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] auth context is null";
+        goto END_OF_AUTH;
+    }
+    if (msg->response.reply(0).is_auth()) {
+        std::string user, password, schema, collation, auth_cmd;
+        const MysqlReply& reply = msg->response.reply(0);
+        MysqlParseAuthenticator(ctx->user(), &user, &password, &schema, 
&collation);
+        if (MysqlPackAuthenticator(reply.auth(), user, password, schema, 
collation, &auth_cmd) ==
+            0) {
+            butil::IOBuf buf;
+            buf.append(auth_cmd);
+            buf.cut_into_file_descriptor(socket->fd());
+            const_cast<AuthContext*>(ctx)->set_group(auth_step[0]);
+        } else {
+            parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+            LOG(ERROR) << "[MYSQL PARSE] wrong pack authentication data";
+        }
+    } else if (msg->response.reply_size() > 0) {
+        for (size_t i = 0; i < msg->response.reply_size(); ++i) {
+            if (!msg->response.reply(i).is_ok()) {
+                LOG(ERROR) << "[MYSQL PARSE] auth failed " << msg->response;
+                parseCode = PARSE_ERROR_NO_RESOURCE;
+                goto END_OF_AUTH;
+            }
+        }
+        std::string params, params_cmd;
+        MysqlParseParams(ctx->user(), &params);
+        if (ctx->group() == auth_step[0] && !params.empty()) {
+            if (MysqlPackParams(params, &params_cmd) == 0) {
+                butil::IOBuf buf;
+                buf.append(params_cmd);
+                buf.cut_into_file_descriptor(socket->fd());
+                const_cast<AuthContext*>(ctx)->set_group(auth_step[1]);
+            } else {
+                parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+                LOG(ERROR) << "[MYSQL PARSE] wrong pack params data";
+            }
+        } else {
+            butil::IOBuf raw_req;
+            raw_req.append(ctx->starter());
+            raw_req.cut_into_file_descriptor(socket->fd());
+            pi->with_auth = false;
+        }
+    } else {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] wrong authentication step";
+    }
+
+END_OF_AUTH:
+    if (bthread_id_unlock(cid) != 0) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] fail to unlock controller";
+    }
+    return parseCode;
+}
+
+ParseError HandlePrepareStatement(const InputResponse* msg,
+                                  const Socket* socket,
+                                  PipelinedInfo* pi) {
+    if (!msg->response.reply(0).is_prepare_ok()) {
+        LOG(ERROR) << "[MYSQL PARSE] response is not prepare ok, " << 
msg->response;
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    const MysqlReply::PrepareOk& ok = msg->response.reply(0).prepare_ok();
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    ParseError parseCode = PARSE_OK;
+    butil::IOBuf buf;
+    butil::Status st;
+    auto stub = ControllerPrivateAccessor(cntl).get_stmt();
+    auto stmt = stub->stmt();
+    if (stmt == NULL || stmt->param_count() != ok.param_count()) {
+        LOG(ERROR) << "[MYSQL PACK] stmt can't be NULL";
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    if (stmt->param_count() != ok.param_count()) {
+        LOG(ERROR) << "[MYSQL PACK] stmt param number " << stmt->param_count()
+                   << " not equal to prepareOk.param_number " << 
ok.param_count();
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    stmt->SetStatementId(socket->id(), ok.stmt_id());
+    st = stub->PackExecuteCommand(&buf, ok.stmt_id());
+    if (!st.ok()) {
+        LOG(ERROR) << "[MYSQL PACK] make execute header error " << st;
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    buf.cut_into_file_descriptor(socket->fd());
+    pi->count = MYSQL_PREPARED_STATEMENT;
+END_OF_PREPARE:
+    if (bthread_id_unlock(cid) != 0) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] fail to unlock controller";
+    }
+    return parseCode;
+}
+
+}  // namespace
+
+// "Message" = "Response" as we only implement the client for mysql.
+ParseResult ParseMysqlMessage(butil::IOBuf* source,
+                              Socket* socket,
+                              bool /*read_eof*/,
+                              const void* /*arg*/) {
+    if (source->empty()) {
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+
+    PipelinedInfo pi;
+    if (!socket->PopPipelinedInfo(&pi)) {
+        LOG(WARNING) << "No corresponding PipelinedInfo in socket";
+        return MakeParseError(PARSE_ERROR_TRY_OTHERS);
+    }
+
+    InputResponse* msg = 
static_cast<InputResponse*>(socket->parsing_context());
+    if (msg == NULL) {
+        msg = new InputResponse;
+        socket->reset_parsing_context(msg);
+    }
+
+    MysqlStmtType stmt_type = static_cast<MysqlStmtType>(pi.count);
+    ParseError err = msg->response.ConsumePartialIOBuf(*source, pi.with_auth, 
stmt_type);
+    if (FLAGS_mysql_verbose) {
+        LOG(INFO) << "[MYSQL PARSE] " << msg->response;
+    }
+    if (err != PARSE_OK) {
+        if (err == PARSE_ERROR_NOT_ENOUGH_DATA) {
+            socket->GivebackPipelinedInfo(pi);
+        }
+        return MakeParseError(err);
+    }
+    if (pi.with_auth) {
+        ParseError err = HandleAuthentication(msg, socket, &pi);
+        if (err != PARSE_OK) {
+            return MakeParseError(err, "Fail to authenticate with Mysql");
+        }
+        DestroyingPtr<InputResponse> auth_msg =
+            static_cast<InputResponse*>(socket->release_parsing_context());
+        socket->GivebackPipelinedInfo(pi);
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    if (stmt_type == MYSQL_NEED_PREPARE) {
+        // store stmt_id, make execute header.
+        ParseError err = HandlePrepareStatement(msg, socket, &pi);
+        if (err != PARSE_OK) {
+            return MakeParseError(err, "Fail to make parepared statement with 
Mysql");
+        }
+        DestroyingPtr<InputResponse> prepare_msg =
+            static_cast<InputResponse*>(socket->release_parsing_context());
+        socket->GivebackPipelinedInfo(pi);
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    msg->id_wait = pi.id_wait;
+    socket->release_parsing_context();
+    return MakeMessage(msg);
+}
+
+void ProcessMysqlResponse(InputMessageBase* msg_base) {
+    const int64_t start_parse_us = butil::cpuwide_time_us();
+    DestroyingPtr<InputResponse> msg(static_cast<InputResponse*>(msg_base));
+
+    const bthread_id_t cid = msg->id_wait;
+    Controller* cntl = NULL;
+    const int rc = bthread_id_lock(cid, (void**)&cntl);
+    if (rc != 0) {
+        LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
+            << "Fail to lock correlation_id=" << cid << ": " << berror(rc);
+        return;
+    }
+
+    ControllerPrivateAccessor accessor(cntl);
+    Span* span = accessor.span();
+    if (span) {
+        span->set_base_real_us(msg->base_real_us());
+        span->set_received_us(msg->received_us());
+        span->set_response_size(msg->response.ByteSize());
+        span->set_start_parse_us(start_parse_us);
+    }
+    const int saved_error = cntl->ErrorCode();
+    if (cntl->response() != NULL) {
+        if (cntl->response()->GetDescriptor() != MysqlResponse::descriptor()) {
+            cntl->SetFailed(ERESPONSE, "Must be MysqlResponse");
+        } else {
+            // We work around ParseFrom of pb which is just a placeholder.
+            ((MysqlResponse*)cntl->response())->Swap(&msg->response);
+        }
+    }  // silently ignore the response.
+
+    // Unlocks correlation_id inside. Revert controller's
+    // error code if it version check of `cid' fails
+    msg.reset();  // optional, just release resourse ASAP
+    accessor.OnResponse(cid, saved_error);
+}
+
+void SerializeMysqlRequest(butil::IOBuf* buf,
+                           Controller* cntl,
+                           const google::protobuf::Message* request) {
+    if (request == NULL) {
+        return cntl->SetFailed(EREQUEST, "request is NULL");
+    }
+    if (request->GetDescriptor() != MysqlRequest::descriptor()) {
+        return cntl->SetFailed(EREQUEST, "The request is not a MysqlRequest");
+    }
+    const MysqlRequest* rr = (const MysqlRequest*)request;
+    // We work around SerializeTo of pb which is just a placeholder.
+    if (!rr->SerializeTo(buf)) {
+        return cntl->SetFailed(EREQUEST, "Fail to serialize MysqlRequest");
+    }
+    // mysql protocol don't use pipelined count to verify the end of a 
response, so pipelined count
+    // is meanless, but we can use it help us to distinguish mysql reply type. 
In mysql protocol, we
+    // can't distinguish OK and PreparedOk, so we set pipelined count to 2 to 
let parse function to

Review Comment:
   这么用pipeline_count感觉比较奇怪,是不是放在auth_flags里更合适一些



##########
src/brpc/policy/mysql_auth_hash.cpp:
##########
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2.0, as

Review Comment:
   这个License是GPL,没法用,看能否自己实现一个



##########
src/brpc/policy/mysql_authenticator.cpp:
##########
@@ -0,0 +1,176 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Author(s): Yang,Liming <yanglimin...@baidu.com>
+
+#include <vector>
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_auth_hash.h"
+#include "brpc/mysql_command.h"
+#include "brpc/mysql_reply.h"
+#include "brpc/mysql_common.h"
+#include "butil/base64.h"
+#include "butil/iobuf.h"
+#include "butil/logging.h"  // LOG()
+#include "butil/sys_byteorder.h"
+
+namespace brpc {
+namespace policy {
+
+namespace {
+const butil::StringPiece mysql_native_password("mysql_native_password");
+const char* auth_param_delim = "\t";
+bool MysqlHandleParams(const butil::StringPiece& params, std::string* 
param_cmd) {
+    if (params.empty()) {
+        return true;
+    }
+    const char* delim1 = "&";
+    std::vector<size_t> idx;
+    for (size_t p = params.find(delim1); p != butil::StringPiece::npos;
+         p = params.find(delim1, p + 1)) {
+        idx.push_back(p);
+    }
+
+    const char* delim2 = "=";
+    std::stringstream ss;
+    for (size_t i = 0; i < idx.size() + 1; ++i) {
+        size_t pos = (i > 0) ? idx[i - 1] + 1 : 0;
+        size_t len = (i < idx.size()) ? idx[i] - pos : params.size() - pos;
+        butil::StringPiece raw(params.data() + pos, len);
+        const size_t p = raw.find(delim2);
+        if (p != butil::StringPiece::npos) {
+            butil::StringPiece k(raw.data(), p);
+            butil::StringPiece v(raw.data() + p + 1, raw.size() - p - 1);
+            if (k == "charset") {
+                ss << "SET NAMES " << v << ";";
+            } else {
+                ss << "SET " << k << "=" << v << ";";
+            }
+        }
+    }
+    *param_cmd = ss.str();
+    return true;
+}
+};  // namespace
+
+// user + "\t" + password + "\t" + schema + "\t" + collation + "\t" + param
+bool MysqlAuthenticator::SerializeToString(std::string* str) const {
+    std::stringstream ss;
+    ss << _user << auth_param_delim;
+    ss << _passwd << auth_param_delim;
+    ss << _schema << auth_param_delim;
+    ss << _collation << auth_param_delim;
+    std::string param_cmd;
+    if (MysqlHandleParams(_params, &param_cmd)) {
+        ss << param_cmd;
+    } else {
+        LOG(ERROR) << "handle mysql authentication params failed, ignore it";
+        return false;
+    }
+    *str = ss.str();
+    return true;
+}
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,

Review Comment:
   是不是可以实现成MysqlAuthenticator::ParseFromString 这样更对称一些



##########
src/brpc/policy/mysql_authenticator.cpp:
##########
@@ -0,0 +1,176 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Author(s): Yang,Liming <yanglimin...@baidu.com>
+
+#include <vector>
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_auth_hash.h"
+#include "brpc/mysql_command.h"
+#include "brpc/mysql_reply.h"
+#include "brpc/mysql_common.h"
+#include "butil/base64.h"
+#include "butil/iobuf.h"
+#include "butil/logging.h"  // LOG()
+#include "butil/sys_byteorder.h"
+
+namespace brpc {
+namespace policy {
+
+namespace {
+const butil::StringPiece mysql_native_password("mysql_native_password");
+const char* auth_param_delim = "\t";
+bool MysqlHandleParams(const butil::StringPiece& params, std::string* 
param_cmd) {

Review Comment:
   加个空行



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to