This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new edd501b5b1a [streamload](redirect) Support redirect-policy for
streamload which is used by audit plugin (#35840)
edd501b5b1a is described below
commit edd501b5b1a5cda2dc43d02008b3a79f7be02c8a
Author: Gavin Chou <[email protected]>
AuthorDate: Wed Jun 5 08:51:16 2024 +0800
[streamload](redirect) Support redirect-policy for streamload which is used
by audit plugin (#35840)
Audit log plugin uses streamload to save audit data, in cloud mode, the
streamload initiated by the audit-log component may be forwarded to a
Load Balancer (LB) attached to a Backend (BE). This can cause the
request to fail due to network issues.
This PR adds new a streamlaod redirect policy for to get rid the LB
issue.
---
be/src/common/utils.h | 3 +-
be/src/http/utils.cpp | 35 ++++++++++---------
.../org/apache/doris/httpv2/rest/LoadAction.java | 40 ++++++++++++++++++----
.../org/apache/doris/load/StreamLoadHandler.java | 7 ++++
.../doris/plugin/audit/AuditStreamLoader.java | 6 ++--
5 files changed, 64 insertions(+), 27 deletions(-)
diff --git a/be/src/common/utils.h b/be/src/common/utils.h
index b03e41260e7..46df44a40f2 100644
--- a/be/src/common/utils.h
+++ b/be/src/common/utils.h
@@ -37,18 +37,17 @@ struct AuthInfo {
template <class T>
void set_request_auth(T* req, const AuthInfo& auth) {
+ req->user = auth.user; // always set user, because it may be used by FE
if (auth.auth_code != -1) {
// if auth_code is set, no need to set other info
req->__set_auth_code(auth.auth_code);
// user name and passwd is unused, but they are required field.
// so they have to be set.
- req->user = "";
req->passwd = "";
} else if (auth.token != "") {
req->__isset.token = true;
req->token = auth.token;
} else {
- req->user = auth.user;
req->passwd = auth.passwd;
if (!auth.cluster.empty()) {
req->__set_cluster(auth.cluster);
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index b03017c12a7..fbbc1cd93bf 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -77,30 +77,31 @@ bool parse_basic_auth(const HttpRequest& req, std::string*
user, std::string* pa
bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) {
const auto& token = req.header("token");
const auto& auth_code = req.header(HTTP_AUTH_CODE);
+
+ std::tuple<std::string, std::string, std::string> tmp;
+ auto& [user, pass, cluster] = tmp;
+ bool valid_basic_auth = parse_basic_auth(req, &user, &pass);
+ if (valid_basic_auth) { // always set the basic auth, the user may be
useful
+ auto pos = user.find('@');
+ if (pos != std::string::npos) {
+ cluster.assign(user.c_str() + pos + 1);
+ user.assign(user.c_str(), pos); // user is updated
+ }
+ auth->user = user;
+ auth->passwd = pass;
+ auth->cluster = cluster;
+ }
+
if (!token.empty()) {
auth->token = token;
} else if (!auth_code.empty()) {
auth->auth_code = std::stoll(auth_code);
- } else {
- std::string full_user;
- if (!parse_basic_auth(req, &full_user, &auth->passwd)) {
- return false;
- }
- auto pos = full_user.find('@');
- if (pos != std::string::npos) {
- auth->user.assign(full_user.data(), pos);
- auth->cluster.assign(full_user.data() + pos + 1);
- } else {
- auth->user = full_user;
- }
+ } else if (!valid_basic_auth) {
+ return false;
}
// set user ip
- if (req.remote_host() != nullptr) {
- auth->user_ip.assign(req.remote_host());
- } else {
- auth->user_ip.assign("");
- }
+ auth->user_ip.assign(req.remote_host() != nullptr ? req.remote_host() :
"");
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index ca69ba13c08..de15a2816f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -57,6 +57,7 @@ import org.springframework.web.servlet.view.RedirectView;
import java.net.InetAddress;
import java.net.URI;
+import java.util.Enumeration;
import java.util.List;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
@@ -69,6 +70,11 @@ public class LoadAction extends RestBaseController {
public static final String SUB_LABEL_NAME_PARAM = "sub_label";
+ public static final String HEADER_REDIRECT_POLICY = "redirect-policy";
+
+ public static final String REDIRECT_POLICY_PUBLIC_PRIVATE =
"public-private";
+ public static final String REDIRECT_POLICY_RANDOM_BE = "random-be";
+
private ExecuteEnv execEnv = ExecuteEnv.getInstance();
private int lastSelectedBackendIndex = 0;
@@ -94,6 +100,7 @@ public class LoadAction extends RestBaseController {
public Object streamLoad(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
+ LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table,
getAllHeaders(request));
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null &&
groupCommitStr.equalsIgnoreCase("async_mode")) {
@@ -209,6 +216,7 @@ public class LoadAction extends RestBaseController {
public Object streamLoad2PC(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db) {
+ LOG.info("streamload action 2PC, db: {}, headers: {}", db,
getAllHeaders(request));
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
@@ -222,6 +230,7 @@ public class LoadAction extends RestBaseController {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String
table) {
+ LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db,
table, getAllHeaders(request));
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
@@ -348,8 +357,7 @@ public class LoadAction extends RestBaseController {
if (Strings.isNullOrEmpty(cloudClusterName)) {
throw new LoadException("No cloud cluster name selected.");
}
- String reqHostStr =
request.getHeader(HttpHeaderNames.HOST.toString());
- return selectCloudRedirectBackend(cloudClusterName, reqHostStr,
groupCommit);
+ return selectCloudRedirectBackend(cloudClusterName, request,
groupCommit);
} else {
return selectLocalRedirectBackend(groupCommit);
}
@@ -391,10 +399,18 @@ public class LoadAction extends RestBaseController {
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
- private TNetworkAddress selectCloudRedirectBackend(String clusterName,
String reqHostStr, boolean groupCommit)
+ private TNetworkAddress selectCloudRedirectBackend(String clusterName,
HttpServletRequest req, boolean groupCommit)
throws LoadException {
Backend backend = StreamLoadHandler.selectBackend(clusterName,
groupCommit);
+ String redirectPolicy =
req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
+ // User specified redirect policy
+ if (redirectPolicy != null &&
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_RANDOM_BE)) {
+ return new TNetworkAddress(backend.getHost(),
backend.getHttpPort());
+ }
+ redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
+ ? Config.streamload_redirect_policy : redirectPolicy;
+
Pair<String, Integer> publicHostPort = null;
Pair<String, Integer> privateHostPort = null;
try {
@@ -413,6 +429,7 @@ public class LoadAction extends RestBaseController {
throw new LoadException(e.getMessage());
}
+ String reqHostStr = req.getHeader(HttpHeaderNames.HOST.toString());
reqHostStr = reqHostStr.replaceAll("\\s+", "");
if (reqHostStr.isEmpty()) {
LOG.info("Invalid header host: {}", reqHostStr);
@@ -430,8 +447,8 @@ public class LoadAction extends RestBaseController {
throw new LoadException("Invalid header host: " + reqHost);
}
- if
(Config.streamload_redirect_policy.equalsIgnoreCase("public-private")) {
- // ip
+ if (redirectPolicy != null &&
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PUBLIC_PRIVATE)) {
+ // redirect with ip
if (InetAddressValidator.getInstance().isValid(reqHost)) {
InetAddress addr;
try {
@@ -451,7 +468,7 @@ public class LoadAction extends RestBaseController {
}
}
- // domin
+ // redirect with domain
if (publicHostPort != null &&
reqHost.toLowerCase().contains("public")) {
return new TNetworkAddress(publicHostPort.first,
publicHostPort.second);
} else if (privateHostPort != null) {
@@ -585,4 +602,15 @@ public class LoadAction extends RestBaseController {
ConnectContext.remove();
}
}
+
+ private String getAllHeaders(HttpServletRequest request) {
+ StringBuilder headers = new StringBuilder();
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String headerName = headerNames.nextElement();
+ String headerValue = request.getHeader(headerName);
+
headers.append(headerName).append(":").append(headerValue).append(", ");
+ }
+ return headers.toString();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
index 2029f96da3b..f2f17914a15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
@@ -80,6 +80,13 @@ public class StreamLoadHandler {
this.clientAddr = clientAddr;
}
+ /**
+ * Select a random backend in the given cloud cluster.
+ *
+ * @param clusterName cloud cluster name
+ * @param groupCommit if this selection is for group commit
+ * @throws LoadException if there is no available backend
+ */
public static Backend selectBackend(String clusterName, boolean
groupCommit) throws LoadException {
List<Backend> backends = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
.getBackendsByClusterName(clusterName)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
index b29b3dfbe3b..3765872810d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
@@ -58,7 +58,7 @@ public class AuditStreamLoader {
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
conn.setRequestProperty("token", clusterToken);
- conn.setRequestProperty("Authorization", "Basic ");
+ conn.setRequestProperty("Authorization", "Basic YWRtaW46"); // admin
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
@@ -67,6 +67,7 @@ public class AuditStreamLoader {
conn.addRequestProperty("columns",
InternalSchema.AUDIT_SCHEMA.stream().map(c ->
c.getName()).collect(
Collectors.joining(",")));
+ conn.addRequestProperty("redirect-policy", "random-be");
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
@@ -75,13 +76,14 @@ public class AuditStreamLoader {
private String toCurl(HttpURLConnection conn) {
StringBuilder sb = new StringBuilder("curl -v ");
sb.append("-X ").append(conn.getRequestMethod()).append(" \\\n ");
- sb.append("-H \"").append("Authorization\":").append("\"Basic
").append("\" \\\n ");
+ sb.append("-H \"").append("Authorization\":").append("\"Basic
YWRtaW46").append("\" \\\n ");
sb.append("-H \"").append("Expect\":").append("\"100-continue\" \\\n
");
sb.append("-H \"").append("Content-Type\":").append("\"text/plain;
charset=UTF-8\" \\\n ");
sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n
");
sb.append("-H \"").append("columns\":")
.append("\"" + InternalSchema.AUDIT_SCHEMA.stream().map(c ->
c.getName()).collect(
Collectors.joining(",")) + "\" \\\n ");
+ sb.append("-H
\"").append("redirect-policy\":").append("\"random-be").append("\" \\\n ");
sb.append("\"").append(conn.getURL()).append("\"");
return sb.toString();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]