imay commented on a change in pull request #1323: This commit has brought
contribution to streaming mini load
URL: https://github.com/apache/incubator-doris/pull/1323#discussion_r295650388
##########
File path: be/src/http/action/mini_load.cpp
##########
@@ -516,4 +602,217 @@ bool LoadHandleCmp::operator() (const LoadHandle& lhs,
const LoadHandle& rhs) co
return false;
}
+// fe will begin the txn and record the metadata of load
+Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) {
+ // prepare begin mini load request params
+ TMiniLoadBeginRequest request;
+ set_request_auth(&request, ctx->auth);
+ request.db = ctx->db;
+ request.tbl = ctx->table;
+ request.label = ctx->label;
+ if (!ctx->sub_label.empty()) {
+ request.__set_sub_label(ctx->sub_label);
+ }
+ if (ctx->timeout_second != -1) {
+ request.__set_timeout_second(ctx->timeout_second);
+ }
+ if (ctx->max_filter_ratio != 0.0) {
+ request.__set_max_filter_ratio(ctx->max_filter_ratio);
+ }
+ request.create_timestamp = GetCurrentTimeMicros();
+ // begin load by master
+ const TNetworkAddress& master_addr =
_exec_env->master_info()->network_address;
+ TMiniLoadBeginResult res;
+ RETURN_IF_ERROR(FrontendHelper::rpc(
+ master_addr.hostname, master_addr.port,
+ [&request, &res] (FrontendServiceConnection& client) {
+ client->miniLoadBegin(res, request);
+ }));
+ Status begin_status(res.status);
+ if (!begin_status.ok()) {
+ LOG(INFO) << "failed to begin mini load " << ctx->label << " with
error msg:"
+ << begin_status.get_error_msg();
+ return begin_status;
+ }
+ ctx->txn_id = res.txn_id;
+ // txn has been begun in fe
+ ctx->need_rollback = true;
+ return Status::OK();
+}
+
+Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) {
+ // prepare request parameters
+ TStreamLoadPutRequest put_request;
+ set_request_auth(&put_request, ctx->auth);
+ put_request.db = ctx->db;
+ put_request.tbl = ctx->table;
+ put_request.txnId = ctx->txn_id;
+ put_request.formatType = ctx->format;
+ put_request.__set_loadId(ctx->id.to_thrift());
+ put_request.fileType = TFileType::FILE_STREAM;
+ std::map<std::string, std::string> params(
+ req->query_params().begin(), req->query_params().end());
+ /* merge params of columns and hll
+ * for example:
+ * input: columns=c1,tmp_c2,tmp_c3\&hll=hll_c2,tmp_c2:hll_c3,tmp_c3
+ * output:
columns=c1,tmp_c2,tmp_c3,hll_c2=hll_hash(tmp_c2),hll_c3=hll_hash(tmp_c3)
+ */
+ std::map<std::string, std::string>::iterator columns_it =
params.find(COLUMNS_KEY);
+ if (columns_it != params.end()) {
+ std::string columns_value = columns_it->second;
+ std::map<std::string, std::string>::iterator hll_it =
params.find(HLL_KEY);
+ if (hll_it != params.end()) {
+ std::string hll_value = hll_it->second;
+ if (hll_value.empty()) {
+ return Status::InvalidArgument("Hll value could not be empty
when hll key is exists!");
+ }
+ std::map<std::string, std::string> hll_map;
+ RETURN_IF_ERROR(StringParser::split_string_to_map(hll_value, ":",
",", &hll_map));
+ if (hll_map.empty()) {
+ return Status::InvalidArgument("Hll value could not tranform
to hll expr: " + hll_value);
+ }
+ auto hll_map_it = hll_map.begin();
+ while (hll_map_it != hll_map.end()) {
Review comment:
I think what you want is `for (auto& it : hll_map)`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]