EmmyMiao87 commented on a change in pull request #1323: This commit has brought 
contribution to streaming mini load
URL: https://github.com/apache/incubator-doris/pull/1323#discussion_r294246984
 
 

 ##########
 File path: be/src/http/action/mini_load.cpp
 ##########
 @@ -516,4 +622,230 @@ bool LoadHandleCmp::operator() (const LoadHandle& lhs, 
const LoadHandle& rhs) co
     return false;
 }
 
+// fe will begin the txn and record the metadata of load 
+Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) {
+    // prepare begin mini load request params
+    TMiniLoadBeginRequest request;
+    set_request_auth(&request, ctx->auth);
+    request.db = ctx->db;
+    request.tbl = ctx->table;
+    request.label = ctx->label;
+    if (!ctx->sub_label.empty()) {
+        request.__set_sub_label(ctx->sub_label);
+    }
+    if (ctx->timeout_second != -1) {
+        request.__set_timeout_second(ctx->timeout_second);
+    }
+    if (ctx->max_filter_ratio != 0.0) {
+        request.__set_max_filter_ratio(ctx->max_filter_ratio);
+    }
+    request.create_timestamp = GetCurrentTimeMicros();
+    // begin load by master
+    const TNetworkAddress& master_addr = 
_exec_env->master_info()->network_address;
+    TMiniLoadBeginResult res;
+    RETURN_IF_ERROR(FrontendHelper::rpc(
+            master_addr.hostname, master_addr.port,
+            [&request, &res] (FrontendServiceConnection& client) {
+            client->miniLoadBegin(res, request);
+            }));
+    Status begin_status(res.status);
+    if (!begin_status.ok()) {
+        LOG(INFO) << "failed to begin mini load " << ctx->label << " with 
error msg:"
+                  << begin_status.get_error_msg();
+        return begin_status;
+    }
+    ctx->txn_id = res.txn_id;
+    // txn has been begun in fe
+    ctx->need_rollback = true;
+    return Status::OK;
+}
+
+Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) {
+    // prepare request parameters
+    TStreamLoadPutRequest put_request;
+    set_request_auth(&put_request, ctx->auth);
+    put_request.db = ctx->db;
+    put_request.tbl = ctx->table;
+    put_request.txnId = ctx->txn_id;
+    put_request.formatType = ctx->format;
+    put_request.__set_loadId(ctx->id.to_thrift());
+    put_request.fileType = TFileType::FILE_STREAM;
+    std::map<std::string, std::string> params(
+            req->query_params().begin(), req->query_params().end());
+    std::map<std::string, std::string>::iterator columns_it = 
params.find(COLUMNS_KEY);
+    if (columns_it != params.end()) {
+        std::string columns_value = columns_it->second;
+        std::map<std::string, std::string>::iterator hll_it = 
params.find(HLL_KEY);
+        if (hll_it != params.end()) {
+            std::string hll_value = hll_it->second;
+            if (hll_value.empty()) {
+                return Status("Hll value could not be empty when hll key is 
exists!"); 
+            }
+            std::map<std::string, std::string>* hll_map = new 
std::map<std::string, std::string>;
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to