This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ad1f6350701 [Feature](auditloader) Plugin auditloader use auth token
to avoid using cleartext passwords in config (#26278)
ad1f6350701 is described below
commit ad1f6350701516c82509da0612574567989bd7b6
Author: zhiqiang <[email protected]>
AuthorDate: Tue Nov 7 05:14:57 2023 -0600
[Feature](auditloader) Plugin auditloader use auth token to avoid using
cleartext passwords in config (#26278)
Doris FE will check if stream load http request has auth token after
checking password failed;
Plugin audit-log loader can use auth token if plugin config set
use_auth_token to true
Co-authored-by: Mingyu Chen <[email protected]>
---
.../org/apache/doris/httpv2/rest/LoadAction.java | 112 ++++++++++++++++++++-
.../auditloader/src/main/assembly/plugin.conf | 3 +
.../doris/plugin/audit/AuditLoaderPlugin.java | 20 +++-
.../doris/plugin/audit/DorisStreamLoader.java | 11 +-
4 files changed, 138 insertions(+), 8 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 9c5902d748a..ff6e98a6c78 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
+import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
@@ -43,6 +44,7 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;
+import java.net.URI;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -81,7 +83,20 @@ public class LoadAction extends RestBaseController {
return redirectToHttps(request);
}
- executeCheckPassword(request, response);
+ try {
+ executeCheckPassword(request, response);
+ } catch (UnauthorizedException unauthorizedException) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Check password failed, going to check auth token,
request: {}", request.toString());
+ }
+
+ if (!checkClusterToken(request)) {
+ throw unauthorizedException;
+ } else {
+ return executeWithClusterToken(request, db, table, true);
+ }
+ }
+
return executeWithoutPassword(request, response, db, table, true);
}
@@ -257,4 +272,99 @@ public class LoadAction extends RestBaseController {
}
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
+
+ // NOTE: This function can only be used for AuditlogPlugin stream load for
now.
+ // AuditlogPlugin should be re-disigned carefully, and blow method focuses
on
+ // temporarily addressing the users' needs for audit logs.
+ // So this function is not widely tested under general scenario
+ private boolean checkClusterToken(HttpServletRequest request) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking cluser token, request {}", request.toString());
+ }
+
+ String authToken = request.getHeader("token");
+
+ if (Strings.isNullOrEmpty(authToken)) {
+ return false;
+ }
+
+ return
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken);
+ }
+
+ // NOTE: This function can only be used for AuditlogPlugin stream load for
now.
+ // AuditlogPlugin should be re-disigned carefully, and blow method focuses
on
+ // temporarily addressing the users' needs for audit logs.
+ // So this function is not widely tested under general scenario
+ private Object executeWithClusterToken(HttpServletRequest request, String
db,
+ String table, boolean isStreamLoad) {
+ try {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setEnv(Env.getCurrentEnv());
+ ctx.setThreadLocalInfo();
+ ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
+ ctx.setRemoteIP(request.getRemoteAddr());
+
+ String dbName = db;
+ String tableName = table;
+ // A 'Load' request must have 100-continue header
+ if (request.getHeader(HttpHeaderNames.EXPECT.toString()) == null) {
+ return new RestBaseResult("There is no 100-continue header");
+ }
+
+ final String clusterName = ConnectContext.get().getClusterName();
+ if (Strings.isNullOrEmpty(clusterName)) {
+ return new RestBaseResult("No cluster selected.");
+ }
+
+ if (Strings.isNullOrEmpty(dbName)) {
+ return new RestBaseResult("No database selected.");
+ }
+
+ if (Strings.isNullOrEmpty(tableName)) {
+ return new RestBaseResult("No table selected.");
+ }
+
+ String label = request.getParameter(LABEL_KEY);
+ if (isStreamLoad) {
+ label = request.getHeader(LABEL_KEY);
+ }
+
+ if (!isStreamLoad && Strings.isNullOrEmpty(label)) {
+ // for stream load, the label can be generated by system
automatically
+ return new RestBaseResult("No label selected.");
+ }
+
+ TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);
+
+ LOG.info("Redirect load action with auth token to destination={},"
+ + "stream: {}, db: {}, tbl: {}, label: {}",
+ redirectAddr.toString(), isStreamLoad, dbName, tableName,
label);
+
+ URI urlObj = null;
+ URI resultUriObj = null;
+ String urlStr = request.getRequestURI();
+ String userInfo = null;
+
+ try {
+ urlObj = new URI(urlStr);
+ resultUriObj = new URI("http", userInfo,
redirectAddr.getHostname(),
+ redirectAddr.getPort(), urlObj.getPath(), "", null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ String redirectUrl = resultUriObj.toASCIIString();
+ if (!Strings.isNullOrEmpty(request.getQueryString())) {
+ redirectUrl += request.getQueryString();
+ }
+ LOG.info("Redirect url: {}", redirectUrl);
+ RedirectView redirectView = new RedirectView(redirectUrl);
+ redirectView.setContentType("text/html;charset=utf-8");
+
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
+
+ return redirectView;
+ } catch (Exception e) {
+ LOG.warn("Failed to execute stream load with cluster token, {}",
e);
+ return new RestBaseResult(e.getMessage());
+ }
+ }
}
diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.conf
b/fe_plugins/auditloader/src/main/assembly/plugin.conf
index 31f7bd3f356..aec8724fd96 100755
--- a/fe_plugins/auditloader/src/main/assembly/plugin.conf
+++ b/fe_plugins/auditloader/src/main/assembly/plugin.conf
@@ -51,3 +51,6 @@ user=root
# Doris user's password
password=
+# Use doris cluster token for stream load authorization, if true, user and
password will be ignored.
+use_auth_token=false
+
diff --git
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 2aa1246dd6b..3cfb0eeeaee 100755
---
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -18,6 +18,7 @@
package org.apache.doris.plugin.audit;
import org.apache.doris.common.Config;
+import org.apache.doris.catalog.Env;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditPlugin;
import org.apache.doris.plugin.Plugin;
@@ -84,7 +85,6 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
this.lastLoadTimeSlowLog = System.currentTimeMillis();
loadConfig(ctx, info.getProperties());
-
this.auditEventQueue =
Queues.newLinkedBlockingDeque(conf.maxQueueSize);
this.streamLoader = new DorisStreamLoader(conf);
this.loadThread = new Thread(new LoadWorker(this.streamLoader),
"audit loader thread");
@@ -209,7 +209,16 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
if (logBuffer.length() >= conf.maxBatchSize || currentTime -
lastLoadTime >= conf.maxBatchIntervalSec * 1000) {
// begin to load
try {
- DorisStreamLoader.LoadResponse response =
loader.loadBatch(logBuffer, slowLog);
+ String token = "";
+ if (conf.use_auth_token) {
+ try {
+ // Acquire token from master
+ token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+ } catch (Exception e) {
+ LOG.error("Failed to get auth token: {}", e);
+ }
+ }
+ DorisStreamLoader.LoadResponse response =
loader.loadBatch(logBuffer, slowLog, token);
LOG.debug("audit loader response: {}", response);
} catch (Exception e) {
LOG.debug("encounter exception when putting current audit
batch, discard current batch", e);
@@ -248,6 +257,7 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
public static final String PROP_ENABLE_SLOW_LOG = "enable_slow_log";
// the max stmt length to be loaded in audit table.
public static final String MAX_STMT_LENGTH = "max_stmt_length";
+ public static final String USE_AUTH_TOKEN = "use_auth_token";
public long maxBatchSize = 50 * 1024 * 1024;
public long maxBatchIntervalSec = 60;
@@ -262,6 +272,9 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
// the identity of FE which run this plugin
public String feIdentity = "";
public int max_stmt_length = 4096;
+ // auth_token is not used by default
+ public boolean use_auth_token = false;
+
public void init(Map<String, String> properties) throws
PluginException {
try {
@@ -302,6 +315,9 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
if (properties.containsKey(MAX_STMT_LENGTH)) {
max_stmt_length =
Integer.parseInt(properties.get(MAX_STMT_LENGTH));
}
+ if (properties.containsKey(USE_AUTH_TOKEN)) {
+ use_auth_token =
Boolean.valueOf(properties.get(USE_AUTH_TOKEN));
+ }
} catch (Exception e) {
throw new PluginException(e.getMessage());
}
diff --git
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
index 2781bcc2073..d389f0dfa81 100644
---
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
+++
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
@@ -59,11 +59,12 @@ public class DorisStreamLoader {
this.feIdentity = conf.feIdentity.replaceAll("\\.", "_");
}
- private HttpURLConnection getConnection(String urlStr, String label)
throws IOException {
+ private HttpURLConnection getConnection(String urlStr, String label,
String clusterToken) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
+ conn.setRequestProperty("token", clusterToken);
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
@@ -114,7 +115,7 @@ public class DorisStreamLoader {
return response.toString();
}
- public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
+ public LoadResponse loadBatch(StringBuilder sb, boolean slowLog, String
clusterToken) {
Calendar calendar = Calendar.getInstance();
String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1,
calendar.get(Calendar.DAY_OF_MONTH),
@@ -127,10 +128,10 @@ public class DorisStreamLoader {
// build request and send to fe
if (slowLog) {
label = "slow" + label;
- feConn = getConnection(slowLogLoadUrlStr, label);
+ feConn = getConnection(slowLogLoadUrlStr, label, clusterToken);
} else {
label = "audit" + label;
- feConn = getConnection(auditLogLoadUrlStr, label);
+ feConn = getConnection(auditLogLoadUrlStr, label,
clusterToken);
}
int status = feConn.getResponseCode();
// fe send back http response code TEMPORARY_REDIRECT 307 and new
be location
@@ -143,7 +144,7 @@ public class DorisStreamLoader {
throw new Exception("redirect location is null");
}
// build request and send to new be location
- beConn = getConnection(location, label);
+ beConn = getConnection(location, label, clusterToken);
// send data to be
try (BufferedOutputStream bos = new
BufferedOutputStream(beConn.getOutputStream())) {
bos.write(sb.toString().getBytes());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]