imay commented on a change in pull request #1323: This commit has brought
contribution to streaming mini load
URL: https://github.com/apache/incubator-doris/pull/1323#discussion_r294585516
##########
File path: be/src/http/action/mini_load.cpp
##########
@@ -516,4 +622,229 @@ bool LoadHandleCmp::operator() (const LoadHandle& lhs,
const LoadHandle& rhs) co
return false;
}
+// fe will begin the txn and record the metadata of load
+Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) {
+ // prepare begin mini load request params
+ TMiniLoadBeginRequest request;
+ set_request_auth(&request, ctx->auth);
+ request.db = ctx->db;
+ request.tbl = ctx->table;
+ request.label = ctx->label;
+ if (!ctx->sub_label.empty()) {
+ request.__set_sub_label(ctx->sub_label);
+ }
+ if (ctx->timeout_second != -1) {
+ request.__set_timeout_second(ctx->timeout_second);
+ }
+ if (ctx->max_filter_ratio != 0.0) {
+ request.__set_max_filter_ratio(ctx->max_filter_ratio);
+ }
+ request.create_timestamp = GetCurrentTimeMicros();
+ // begin load by master
+ const TNetworkAddress& master_addr =
_exec_env->master_info()->network_address;
+ TMiniLoadBeginResult res;
+ RETURN_IF_ERROR(FrontendHelper::rpc(
+ master_addr.hostname, master_addr.port,
+ [&request, &res] (FrontendServiceConnection& client) {
+ client->miniLoadBegin(res, request);
+ }));
+ Status begin_status(res.status);
+ if (!begin_status.ok()) {
+ LOG(INFO) << "failed to begin mini load " << ctx->label << " with
error msg:"
+ << begin_status.get_error_msg();
+ return begin_status;
+ }
+ ctx->txn_id = res.txn_id;
+ // txn has been begun in fe
+ ctx->need_rollback = true;
+ return Status::OK();
+}
+
+Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) {
+ // prepare request parameters
+ TStreamLoadPutRequest put_request;
+ set_request_auth(&put_request, ctx->auth);
+ put_request.db = ctx->db;
+ put_request.tbl = ctx->table;
+ put_request.txnId = ctx->txn_id;
+ put_request.formatType = ctx->format;
+ put_request.__set_loadId(ctx->id.to_thrift());
+ put_request.fileType = TFileType::FILE_STREAM;
+ std::map<std::string, std::string> params(
+ req->query_params().begin(), req->query_params().end());
+ /* merge params of columns and hll
+ * for example:
+ * input: columns=c1,tmp_c2,tmp_c3\&hll=hll_c2,tmp_c2:hll_c3,tmp_c3
+ * output:
columns=c1,tmp_c2,tmp_c3,hll_c2=hll_hash(tmp_c2),hll_c3=hll_hash(tmp_c3)
+ */
+ std::map<std::string, std::string>::iterator columns_it =
params.find(COLUMNS_KEY);
+ if (columns_it != params.end()) {
+ std::string columns_value = columns_it->second;
+ std::map<std::string, std::string>::iterator hll_it =
params.find(HLL_KEY);
+ if (hll_it != params.end()) {
+ std::string hll_value = hll_it->second;
+ if (hll_value.empty()) {
+ return Status::InvalidArgument("Hll value could not be empty
when hll key is exists!");
+ }
+ std::map<std::string, std::string> hll_map;
+ split_string_to_map(hll_value, ":", ",", &hll_map);
+ if (hll_map.empty()) {
+ return Status::InvalidArgument("Hll value could not tranform
to hll expr: " + hll_value);
+ }
+ auto hll_map_it = hll_map.begin();
+ while (hll_map_it != hll_map.end()) {
+ columns_value += "," + hll_map_it->first
+ + "=hll_hash(" + hll_map_it->second + ")";
+ ++hll_map_it;
+ }
+ }
+ put_request.__set_columns(columns_value);
+ }
+ std::map<std::string, std::string>::iterator column_separator_it =
params.find(COLUMN_SEPARATOR_KEY);
+ if (column_separator_it != params.end()) {
+ put_request.__set_columnSeparator(column_separator_it->second);
+ }
+
+ // plan this load
+ TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ RETURN_IF_ERROR(FrontendHelper::rpc(master_addr.hostname, master_addr.port,
+ [&put_request, ctx] (FrontendServiceConnection& client) {
+ client->streamLoadPut(ctx->put_result, put_request);
+ }));
+ Status plan_status(ctx->put_result.status);
+ if (!plan_status.ok()) {
+ LOG(WARNING) << "plan streaming load failed. errmsg=" <<
plan_status.get_error_msg()
+ << ctx->brief();
+ return plan_status;
+ }
+ VLOG(3) << "params is " <<
apache::thrift::ThriftDebugString(ctx->put_result.params);
+ return Status::OK();
+}
+
+// new on_header of mini load
+Status MiniLoadAction::_on_new_header(HttpRequest* req) {
+ size_t body_bytes = 0;
+ size_t max_body_bytes = config::mini_load_max_mb * 1024 * 1024;
+ if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+ body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
+ if (body_bytes > max_body_bytes) {
+ std::stringstream ss;
+ ss << "file size exceed max body size, max_body_bytes=" <<
max_body_bytes;
+ return Status::InvalidArgument(ss.str());
+ }
+ } else {
+ evhttp_connection_set_max_body_size(
+ evhttp_request_get_connection(req->get_evhttp_request()),
+ max_body_bytes);
+ }
+
+ RETURN_IF_ERROR(check_request(req));
+
+ StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
+ ctx->ref();
+ req->set_handler_ctx(ctx);
+
+ // auth information
+ if (!parse_basic_auth(*req, &ctx->auth)) {
+ LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
+ return Status::InvalidArgument("no valid Basic authorization");
+ }
+
+ ctx->load_type = TLoadType::MINI_LOAD;
+ ctx->load_src_type = TLoadSourceType::RAW;
+
+ ctx->db = req->param(DB_KEY);
+ ctx->table = req->param(TABLE_KEY);
+ ctx->label = req->param(LABEL_KEY);
+ if(!req->param(SUB_LABEL_KEY).empty()) {
+ ctx->sub_label = req->param(SUB_LABEL_KEY);
+ }
+ ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
+ std::map<std::string, std::string> params(
+ req->query_params().begin(), req->query_params().end());
+ std::map<std::string, std::string>::iterator max_filter_ratio_it =
params.find(MAX_FILTER_RATIO_KEY);
+ if (max_filter_ratio_it != params.end()) {
+ ctx->max_filter_ratio = strtod(max_filter_ratio_it->second.c_str(),
nullptr);
+ }
+ std::map<std::string, std::string>::iterator timeout_it =
params.find(TIMEOUT_KEY);
+ if (timeout_it != params.end()) {
+ ctx->timeout_second = std::stoi(timeout_it->second);
+ }
+
+ LOG(INFO) << "new income mini load request." << ctx->brief()
+ << ", db: " << ctx->db << ", tbl: " << ctx->table;
+
+ // record metadata in frontend
+ RETURN_IF_ERROR(_begin_mini_load(ctx));
+
+ // open sink
+ auto pipe = std::make_shared<StreamLoadPipe>();
+ RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
+ ctx->body_sink = pipe;
+
+ // get plan from fe
+ RETURN_IF_ERROR(_process_put(req, ctx));
+
+ // execute plan
+ return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
+}
+
+void MiniLoadAction::_new_handle(HttpRequest* req) {
+ StreamLoadContext* ctx = (StreamLoadContext*) req->handler_ctx();
+ DCHECK(ctx != nullptr);
+
+ if (ctx->status.ok()) {
+ ctx->status = _on_new_handle(ctx);
+ if (!ctx->status.ok()) {
+ LOG(WARNING) << "handle mini load failed, id=" << ctx->id
+ << ", errmsg=" << ctx->status.get_error_msg();
+ }
+ }
+
+ if (!ctx->status.ok()) {
+ if (ctx->need_rollback) {
+ _exec_env->stream_load_executor()->rollback_txn(ctx);
+ ctx->need_rollback = false;
+ }
+ if (ctx->body_sink.get() != nullptr) {
+ ctx->body_sink->cancel();
+ }
+ }
+
+ std::string status_str = "Success";
+ std::string msg = "OK";
+ if (!ctx->status.ok()) {
+ // we do not send 500 reply to client, send 200 with error msg
+ status_str = "FAILED";
+ msg = ctx->status.get_error_msg();
+ }
+ std::stringstream ss;
Review comment:
use json util
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]