This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c3543510883 [fix](auditlog) add missing audit log fields and duplicate audit log error (#42262) c3543510883 is described below commit c3543510883bc0f67e2656ca2a415da08434c21a Author: Mingyu Chen (Rayner) <morning...@163.com> AuthorDate: Sat Nov 2 11:20:19 2024 +0800 [fix](auditlog) add missing audit log fields and duplicate audit log error (#42262) ### What problem does this PR solve? Problem Summary: #### Issue 1 There are some fields that is missing in audit log table. This PR add them all: - shuffle_send_rows - shuffle_send_bytes - scan_bytes_from_local_storage - scan_bytes_from_remote_storage - is_nereids - compute_group Notice that `compute_group` is previously name `cloudClusterName` in fe.audit.log, which is incorrect, so I change it to the right name. After this PR, all these fields will be saved in both audit log table and fe.audit.log #### Issue 2 The `AuditEventBuilder` need to be reset at each run, the there will be duplicate audit log. #### Issue 3 Add a new statement `call flush_audit_log()`. It will flush the audit log immediately to audit_log table. This is useful in test case, so that we don't need to wait 1min to flush the audit log data. ### Release note [fix](auditlog) add missing audit log fields and duplicate audit log error --- .../org/apache/doris/catalog/InternalSchema.java | 17 +++++++ .../nereids/trees/plans/commands/CallCommand.java | 13 +++++ .../plans/commands/call/CallFlushAuditLogFunc.java | 57 ++++++++++++++++++++++ .../trees/plans/commands/call/CallFunc.java | 2 + .../java/org/apache/doris/plugin/AuditEvent.java | 14 +++--- .../java/org/apache/doris/plugin/PluginMgr.java | 13 ++++- .../org/apache/doris/plugin/audit/AuditLoader.java | 29 ++++++----- .../apache/doris/plugin/audit/AuditLogBuilder.java | 2 +- .../java/org/apache/doris/qe/AuditLogHelper.java | 2 + .../data/audit/test_audit_log_behavior.out | 32 ++++++++++++ .../suites/audit/test_audit_log_behavior.groovy | 9 +++- 11 files changed, 165 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java index 768ae22d202..a571334660a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java @@ -139,9 +139,23 @@ public class InternalSchema { AUDIT_SCHEMA.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); AUDIT_SCHEMA .add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA + .add(new ColumnDef("shuffle_send_rows", TypeDef.create(PrimitiveType.BIGINT), + ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA + .add(new ColumnDef("shuffle_send_bytes", TypeDef.create(PrimitiveType.BIGINT), + ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA + .add(new ColumnDef("scan_bytes_from_local_storage", TypeDef.create(PrimitiveType.BIGINT), + ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA + .add(new ColumnDef("scan_bytes_from_remote_storage", TypeDef.create(PrimitiveType.BIGINT), + ColumnNullableType.NULLABLE)); AUDIT_SCHEMA.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); AUDIT_SCHEMA.add(new ColumnDef("stmt_type", TypeDef.createVarchar(48), ColumnNullableType.NULLABLE)); AUDIT_SCHEMA.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA.add( + new ColumnDef("is_nereids", TypeDef.create(PrimitiveType.TINYINT), ColumnNullableType.NULLABLE)); AUDIT_SCHEMA.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), ColumnNullableType.NULLABLE)); AUDIT_SCHEMA .add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); @@ -151,6 +165,9 @@ public class InternalSchema { new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); AUDIT_SCHEMA.add( new ColumnDef("workload_group", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA.add( + new ColumnDef("compute_group", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE)); + // Keep stmt as last column. So that in fe.audit.log, it will be easier to get sql string AUDIT_SCHEMA.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java index 81d950eab6a..898b4c9ecc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands; +import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.StmtType; import org.apache.doris.nereids.analyzer.UnboundFunction; import org.apache.doris.nereids.trees.plans.PlanType; @@ -63,4 +64,16 @@ public class CallCommand extends Command implements ForwardWithSync { public StmtType stmtType() { return StmtType.CALL; } + + @Override + public RedirectStatus toRedirectStatus() { + // Some of call statements may need to be redirected, some may not + String funcName = unboundFunction.getName().toUpperCase(); + switch (funcName) { + case "FLUSH_AUDIT_LOG": + return RedirectStatus.NO_FORWARD; + default: + return RedirectStatus.FORWARD_WITH_SYNC; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFlushAuditLogFunc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFlushAuditLogFunc.java new file mode 100644 index 00000000000..60cae55e7f5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFlushAuditLogFunc.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.call; + +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Expression; + +import java.util.List; + +/** + * call flush_audit_log() + * This will flush audit log immediately to the audit_log table. + * Mainly for test cases, so that we don't need to wait 60 sec to flush the audit log. + */ +public class CallFlushAuditLogFunc extends CallFunc { + + private UserIdentity user; + + private CallFlushAuditLogFunc(UserIdentity user) { + this.user = user; + } + + public static CallFunc create(UserIdentity user, List<Expression> args) { + if (!args.isEmpty()) { + throw new AnalysisException("FLUSH_AUDIT_LOG function requires no parameter"); + } + return new CallFlushAuditLogFunc(user); + } + + @Override + public void run() { + // check priv + if (!Env.getCurrentEnv().getAuth().checkGlobalPriv(user, PrivPredicate.ADMIN)) { + throw new AnalysisException("Only admin can flush audit log"); + } + // flush audit log + Env.getCurrentEnv().getPluginMgr().flushAuditLog(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java index 4a8cf560c28..4eac9c6f9b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java @@ -36,6 +36,8 @@ public abstract class CallFunc { // TODO, built-in functions require a separate management case "EXECUTE_STMT": // Call built-in functions first return CallExecuteStmtFunc.create(user, unboundFunction.getArguments()); + case "FLUSH_AUDIT_LOG": + return CallFlushAuditLogFunc.create(user, unboundFunction.getArguments()); default: return CallProcedure.create(ctx, originSql); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index 55a8b00d2e8..8a4ca5db749 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -80,9 +80,9 @@ public class AuditEvent { public String queryId = ""; @AuditField(value = "IsQuery") public boolean isQuery = false; - @AuditField(value = "isNereids") + @AuditField(value = "IsNereids") public boolean isNereids = false; - @AuditField(value = "feIp") + @AuditField(value = "FeIp") public String feIp = ""; @AuditField(value = "StmtType") public String stmtType = ""; @@ -96,22 +96,20 @@ public class AuditEvent { public long shuffleSendRows = -1; @AuditField(value = "SqlHash") public String sqlHash = ""; - @AuditField(value = "peakMemoryBytes") + @AuditField(value = "PeakMemoryBytes") public long peakMemoryBytes = -1; @AuditField(value = "SqlDigest") public String sqlDigest = ""; - @AuditField(value = "cloudClusterName") + @AuditField(value = "ComputeGroupName") public String cloudClusterName = ""; - @AuditField(value = "TraceId") - public String traceId = ""; @AuditField(value = "WorkloadGroup") public String workloadGroup = ""; // note: newly added fields should be always before fuzzyVariables @AuditField(value = "FuzzyVariables") public String fuzzyVariables = ""; - @AuditField(value = "scanBytesFromLocalStorage") + @AuditField(value = "ScanBytesFromLocalStorage") public long scanBytesFromLocalStorage = -1; - @AuditField(value = "scanBytesFromRemoteStorage") + @AuditField(value = "ScanBytesFromRemoteStorage") public long scanBytesFromRemoteStorage = -1; public long pushToAuditLogQueueTime; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java index daf93f44d96..2ab7de1529c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java @@ -61,6 +61,9 @@ public class PluginMgr implements Writable { // all dynamic plugins should have unique names, private final Set<String> dynamicPluginNames; + // Save this handler for external call + private AuditLoader auditLoader = null; + public PluginMgr() { plugins = new Map[PluginType.MAX_PLUGIN_TYPE_SIZE]; for (int i = 0; i < PluginType.MAX_PLUGIN_TYPE_SIZE; i++) { @@ -113,8 +116,8 @@ public class PluginMgr implements Writable { } // AuditLoader: log audit log to internal table - AuditLoader auditLoaderPlugin = new AuditLoader(); - if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), auditLoaderPlugin)) { + this.auditLoader = new AuditLoader(); + if (!registerBuiltinPlugin(auditLoader.getPluginInfo(), auditLoader)) { LOG.warn("failed to register audit log builder"); } @@ -363,6 +366,12 @@ public class PluginMgr implements Writable { return rows; } + public void flushAuditLog() { + if (auditLoader != null) { + auditLoader.loadIfNecessary(true); + } + } + public void readFields(DataInputStream dis) throws IOException { int size = dis.readInt(); for (int i = 0; i < size; i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java index 27193856937..06789b864b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java @@ -35,8 +35,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -48,9 +46,6 @@ public class AuditLoader extends Plugin implements AuditPlugin { public static final String AUDIT_LOG_TABLE = "audit_log"; - private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") - .withZone(ZoneId.systemDefault()); - private StringBuilder auditLogBuffer = new StringBuilder(); private int auditLogNum = 0; private long lastLoadTimeAuditLog = 0; @@ -90,7 +85,7 @@ public class AuditLoader extends Plugin implements AuditPlugin { // GlobalVariable.audit_plugin_max_batch_bytes. this.auditEventQueue = Queues.newLinkedBlockingDeque(100000); this.streamLoader = new AuditStreamLoader(); - this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread"); + this.loadThread = new Thread(new LoadWorker(), "audit loader thread"); this.loadThread.start(); isInit = true; @@ -143,6 +138,7 @@ public class AuditLoader extends Plugin implements AuditPlugin { } private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { + // should be same order as InternalSchema.AUDIT_SCHEMA logBuffer.append(event.queryId).append("\t"); logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t"); logBuffer.append(event.clientIp).append("\t"); @@ -156,15 +152,21 @@ public class AuditLoader extends Plugin implements AuditPlugin { logBuffer.append(event.scanBytes).append("\t"); logBuffer.append(event.scanRows).append("\t"); logBuffer.append(event.returnRows).append("\t"); + logBuffer.append(event.shuffleSendRows).append("\t"); + logBuffer.append(event.shuffleSendBytes).append("\t"); + logBuffer.append(event.scanBytesFromLocalStorage).append("\t"); + logBuffer.append(event.scanBytesFromRemoteStorage).append("\t"); logBuffer.append(event.stmtId).append("\t"); logBuffer.append(event.stmtType).append("\t"); logBuffer.append(event.isQuery ? 1 : 0).append("\t"); + logBuffer.append(event.isNereids ? 1 : 0).append("\t"); logBuffer.append(event.feIp).append("\t"); logBuffer.append(event.cpuTimeMs).append("\t"); logBuffer.append(event.sqlHash).append("\t"); logBuffer.append(event.sqlDigest).append("\t"); logBuffer.append(event.peakMemoryBytes).append("\t"); logBuffer.append(event.workloadGroup).append("\t"); + logBuffer.append(event.cloudClusterName).append("\t"); // already trim the query in org.apache.doris.qe.AuditLogHelper#logAuditLog String stmt = event.stmt; if (LOG.isDebugEnabled()) { @@ -173,10 +175,12 @@ public class AuditLoader extends Plugin implements AuditPlugin { logBuffer.append(stmt).append("\n"); } - private void loadIfNecessary(AuditStreamLoader loader) { + // public for external call. + // synchronized to avoid concurrent load. + public synchronized void loadIfNecessary(boolean force) { long currentTime = System.currentTimeMillis(); - if (auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes + if (force || auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes || currentTime - lastLoadTimeAuditLog >= GlobalVariable.auditPluginMaxBatchInternalSec * 1000) { // begin to load try { @@ -189,7 +193,7 @@ public class AuditLoader extends Plugin implements AuditPlugin { discardLogNum += auditLogNum; return; } - AuditStreamLoader.LoadResponse response = loader.loadBatch(auditLogBuffer, token); + AuditStreamLoader.LoadResponse response = streamLoader.loadBatch(auditLogBuffer, token); if (LOG.isDebugEnabled()) { LOG.debug("audit loader response: {}", response); } @@ -215,10 +219,8 @@ public class AuditLoader extends Plugin implements AuditPlugin { } private class LoadWorker implements Runnable { - private AuditStreamLoader loader; - public LoadWorker(AuditStreamLoader loader) { - this.loader = loader; + public LoadWorker() { } public void run() { @@ -228,7 +230,7 @@ public class AuditLoader extends Plugin implements AuditPlugin { if (event != null) { assembleAudit(event); // process all audit logs - loadIfNecessary(loader); + loadIfNecessary(false); } } catch (InterruptedException ie) { if (LOG.isDebugEnabled()) { @@ -241,3 +243,4 @@ public class AuditLoader extends Plugin implements AuditPlugin { } } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java index 8d9e2c9d96e..161a5830b9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java @@ -116,7 +116,7 @@ public class AuditLogBuilder extends Plugin implements AuditPlugin { if (af.value().equals("Time(ms)")) { queryTime = (long) f.get(event); } - sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event))); + sb.append("|").append(af.value()).append("=").append(f.get(event)); } String auditLog = sb.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 2bd3cf9c335..5d24452e092 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -187,6 +187,8 @@ public class AuditLogHelper { String cluster = Config.isCloudMode() ? cloudCluster : ""; AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder(); + // ATTN: MUST reset, otherwise, the same AuditEventBuilder instance will be used in the next query. + auditEventBuilder.reset(); auditEventBuilder .setTimestamp(ctx.getStartTime()) .setClientIp(ctx.getClientIP()) diff --git a/regression-test/data/audit/test_audit_log_behavior.out b/regression-test/data/audit/test_audit_log_behavior.out new file mode 100644 index 00000000000..904fa6299e3 --- /dev/null +++ b/regression-test/data/audit/test_audit_log_behavior.out @@ -0,0 +1,32 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !audit_log_schema -- +query_id varchar(48) Yes true \N +time datetime(3) Yes true \N +client_ip varchar(128) Yes true \N +user varchar(128) Yes false \N NONE +catalog varchar(128) Yes false \N NONE +db varchar(128) Yes false \N NONE +state varchar(128) Yes false \N NONE +error_code int Yes false \N NONE +error_message text Yes false \N NONE +query_time bigint Yes false \N NONE +scan_bytes bigint Yes false \N NONE +scan_rows bigint Yes false \N NONE +return_rows bigint Yes false \N NONE +shuffle_send_rows bigint Yes false \N NONE +shuffle_send_bytes bigint Yes false \N NONE +scan_bytes_from_local_storage bigint Yes false \N NONE +scan_bytes_from_remote_storage bigint Yes false \N NONE +stmt_id bigint Yes false \N NONE +stmt_type varchar(48) Yes false \N NONE +is_query tinyint Yes false \N NONE +is_nereids tinyint Yes false \N NONE +frontend_ip varchar(128) Yes false \N NONE +cpu_time_ms bigint Yes false \N NONE +sql_hash varchar(128) Yes false \N NONE +sql_digest varchar(128) Yes false \N NONE +peak_memory_bytes bigint Yes false \N NONE +workload_group text Yes false \N NONE +compute_group text Yes false \N NONE +stmt text Yes false \N NONE + diff --git a/regression-test/suites/audit/test_audit_log_behavior.groovy b/regression-test/suites/audit/test_audit_log_behavior.groovy index 2829474560e..c895abd2c3f 100644 --- a/regression-test/suites/audit/test_audit_log_behavior.groovy +++ b/regression-test/suites/audit/test_audit_log_behavior.groovy @@ -19,7 +19,7 @@ suite("test_audit_log_behavior") { try { sql "set global enable_audit_plugin = true" sql "set global audit_plugin_max_sql_length = 58" - sql "set global audit_plugin_max_batch_interval_sec = 1" + // sql "set global audit_plugin_max_batch_interval_sec = 1" } catch (Exception e) { log.warn("skip this case, because " + e.getMessage()) assertTrue(e.getMessage().toUpperCase().contains("ADMIN")) @@ -71,6 +71,8 @@ suite("test_audit_log_behavior") { ] ] + qt_audit_log_schema """desc internal.__internal_schema.audit_log""" + for (def on : [true, false]) { sql "set enable_nereids_planner=${on}" sql "truncate table __internal_schema.audit_log" @@ -80,6 +82,10 @@ suite("test_audit_log_behavior") { sql tuple2[0] } + // make sure audit event is created. + // see WorkloadRuntimeStatusMgr.getQueryNeedAudit() + Thread.sleep(6000) + sql """call flush_audit_log()""" // check result for (int i = 0; i < cnt; i++) { def tuple2 = sqls.get(i) @@ -96,6 +102,7 @@ suite("test_audit_log_behavior") { assertEquals(res[0][0].toString(), tuple2[1].toString()) } } + // do not turn off sql "set global enable_audit_plugin = false" sql "set global audit_plugin_max_sql_length = 4096" sql "set global audit_plugin_max_batch_interval_sec = 60" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org