This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 31bffdb5fc [enhancement](stats) audit for stats collection #24074
31bffdb5fc is described below
commit 31bffdb5fc307a970880c483227d58795e860799
Author: AKIRA <[email protected]>
AuthorDate: Mon Sep 11 09:26:12 2023 +0900
[enhancement](stats) audit for stats collection #24074
log stas collection sqls in audit log
---
.../apache/doris/blockrule/SqlBlockRuleMgr.java | 8 ++
.../java/org/apache/doris/qe/AuditLogHelper.java | 120 +++++++++++++++++++++
.../java/org/apache/doris/qe/ConnectProcessor.java | 84 +--------------
.../java/org/apache/doris/qe/StmtExecutor.java | 2 +
.../apache/doris/statistics/BaseAnalysisTask.java | 25 +++--
5 files changed, 147 insertions(+), 92 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
index 1bc5505edf..ca4e68a6ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlBlockUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -224,6 +225,10 @@ public class SqlBlockRuleMgr implements Writable {
* Match SQL according to rules.
**/
public void matchSql(String originSql, String sqlHash, String user) throws
AnalysisException {
+ if (ConnectContext.get() != null
+ && ConnectContext.get().getSessionVariable().internalSession) {
+ return;
+ }
// match global rule
List<SqlBlockRule> globalRules =
nameToSqlBlockRuleMap.values().stream().filter(SqlBlockRule::getGlobal).collect(Collectors.toList());
@@ -260,6 +265,9 @@ public class SqlBlockRuleMgr implements Writable {
**/
public void checkLimitations(Long partitionNum, Long tabletNum, Long
cardinality, String user)
throws AnalysisException {
+ if (ConnectContext.get().getSessionVariable().internalSession) {
+ return;
+ }
// match global rule
List<SqlBlockRule> globalRules =
nameToSqlBlockRuleMap.values().stream().filter(SqlBlockRule::getGlobal).collect(Collectors.toList());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
new file mode 100644
index 0000000000..40f870eee1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.InsertStmt;
+import org.apache.doris.analysis.Queriable;
+import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.service.FrontendOptions;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.context.Context;
+import org.apache.commons.codec.digest.DigestUtils;
+
+public class AuditLogHelper {
+
+ public static void logAuditLog(ConnectContext ctx, String origStmt,
StatementBase parsedStmt,
+ org.apache.doris.proto.Data.PQueryStatistics statistics, boolean
printFuzzyVariables) {
+ origStmt = origStmt.replace("\n", " ");
+ // slow query
+ long endTime = System.currentTimeMillis();
+ long elapseMs = endTime - ctx.getStartTime();
+ SpanContext spanContext =
Span.fromContext(Context.current()).getSpanContext();
+
+ ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
+ .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
+ .setState(ctx.getState().toString())
+ .setErrorCode(ctx.getState().getErrorCode() == null ? 0 :
ctx.getState().getErrorCode().getCode())
+ .setErrorMessage((ctx.getState().getErrorMessage() == null ?
"" :
+ ctx.getState().getErrorMessage().replace("\n", "
").replace("\t", " ")))
+ .setQueryTime(elapseMs)
+ .setScanBytes(statistics == null ? 0 :
statistics.getScanBytes())
+ .setScanRows(statistics == null ? 0 : statistics.getScanRows())
+ .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
+ .setPeakMemoryBytes(statistics == null ? 0 :
statistics.getMaxPeakMemoryBytes())
+ .setReturnRows(ctx.getReturnRows())
+ .setStmtId(ctx.getStmtId())
+ .setQueryId(ctx.queryId() == null ? "NaN" :
DebugUtil.printId(ctx.queryId()))
+ .setTraceId(spanContext.isValid() ? spanContext.getTraceId() :
"")
+ .setWorkloadGroup(ctx.getWorkloadGroupName())
+ .setFuzzyVariables(!printFuzzyVariables ? "" :
ctx.getSessionVariable().printFuzzyVariables());
+
+ if (ctx.getState().isQuery()) {
+ MetricRepo.COUNTER_QUERY_ALL.increase(1L);
+
MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L);
+ if (ctx.getState().getStateType() == MysqlStateType.ERR
+ && ctx.getState().getErrType() !=
QueryState.ErrType.ANALYSIS_ERR) {
+ // err query
+ MetricRepo.COUNTER_QUERY_ERR.increase(1L);
+
MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L);
+ } else if (ctx.getState().getStateType() == MysqlStateType.OK
+ || ctx.getState().getStateType() == MysqlStateType.EOF) {
+ // ok query
+ MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
+
MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs);
+
+ if (elapseMs > Config.qe_slow_log_ms) {
+ String sqlDigest = DigestUtils.md5Hex(((Queriable)
parsedStmt).toDigest());
+ ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
+ }
+ }
+ ctx.getAuditEventBuilder().setIsQuery(true);
+ if (ctx.getQueryDetail() != null) {
+ ctx.getQueryDetail().setEventTime(endTime);
+ ctx.getQueryDetail().setEndTime(endTime);
+ ctx.getQueryDetail().setLatency(elapseMs);
+
ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED);
+ QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail());
+ ctx.setQueryDetail(null);
+ }
+ } else {
+ ctx.getAuditEventBuilder().setIsQuery(false);
+ }
+ ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids);
+
+
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
+
+ // We put origin query stmt at the end of audit log, for parsing the
log more convenient.
+ if (!ctx.getState().isQuery() && (parsedStmt != null &&
parsedStmt.needAuditEncryption())) {
+ ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
+ } else {
+ if (parsedStmt instanceof InsertStmt && !((InsertStmt)
parsedStmt).needLoadManager()
+ && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
+ // INSERT INTO VALUES may be very long, so we only log at most
1K bytes.
+ int length = Math.min(1024, origStmt.length());
+ ctx.getAuditEventBuilder().setStmt(origStmt.substring(0,
length));
+ } else {
+ ctx.getAuditEventBuilder().setStmt(origStmt);
+ }
+ }
+ if (!Env.getCurrentEnv().isMaster()) {
+ if (ctx.executor.isForwardToMaster()) {
+
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
+ }
+ }
+
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 73c6debb69..80a4eb80e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.KillStmt;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
-import org.apache.doris.analysis.Queriable;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
@@ -34,7 +33,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@@ -56,17 +54,14 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
-import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.proto.Data;
import org.apache.doris.qe.QueryState.MysqlStateType;
-import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
@@ -279,84 +274,7 @@ public class ConnectProcessor {
private void auditAfterExec(String origStmt, StatementBase parsedStmt,
Data.PQueryStatistics statistics, boolean
printFuzzyVariables) {
- origStmt = origStmt.replace("\n", " ");
- // slow query
- long endTime = System.currentTimeMillis();
- long elapseMs = endTime - ctx.getStartTime();
- SpanContext spanContext =
Span.fromContext(Context.current()).getSpanContext();
-
- ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
- .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
- .setState(ctx.getState().toString())
- .setErrorCode(ctx.getState().getErrorCode() == null ? 0 :
ctx.getState().getErrorCode().getCode())
- .setErrorMessage((ctx.getState().getErrorMessage() == null ?
"" :
- ctx.getState().getErrorMessage().replace("\n", "
").replace("\t", " ")))
- .setQueryTime(elapseMs)
- .setScanBytes(statistics == null ? 0 :
statistics.getScanBytes())
- .setScanRows(statistics == null ? 0 : statistics.getScanRows())
- .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
- .setPeakMemoryBytes(statistics == null ? 0 :
statistics.getMaxPeakMemoryBytes())
- .setReturnRows(ctx.getReturnRows())
- .setStmtId(ctx.getStmtId())
- .setQueryId(ctx.queryId() == null ? "NaN" :
DebugUtil.printId(ctx.queryId()))
- .setTraceId(spanContext.isValid() ? spanContext.getTraceId() :
"")
- .setWorkloadGroup(ctx.getWorkloadGroupName())
- .setFuzzyVariables(!printFuzzyVariables ? "" :
ctx.getSessionVariable().printFuzzyVariables());
-
- if (ctx.getState().isQuery()) {
- MetricRepo.COUNTER_QUERY_ALL.increase(1L);
-
MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L);
- if (ctx.getState().getStateType() == MysqlStateType.ERR
- && ctx.getState().getErrType() !=
QueryState.ErrType.ANALYSIS_ERR) {
- // err query
- MetricRepo.COUNTER_QUERY_ERR.increase(1L);
-
MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L);
- } else if (ctx.getState().getStateType() == MysqlStateType.OK
- || ctx.getState().getStateType() == MysqlStateType.EOF) {
- // ok query
- MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
-
MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs);
-
- if (elapseMs > Config.qe_slow_log_ms) {
- String sqlDigest = DigestUtils.md5Hex(((Queriable)
parsedStmt).toDigest());
- ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
- }
- }
- ctx.getAuditEventBuilder().setIsQuery(true);
- if (ctx.getQueryDetail() != null) {
- ctx.getQueryDetail().setEventTime(endTime);
- ctx.getQueryDetail().setEndTime(endTime);
- ctx.getQueryDetail().setLatency(elapseMs);
-
ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED);
- QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail());
- ctx.setQueryDetail(null);
- }
- } else {
- ctx.getAuditEventBuilder().setIsQuery(false);
- }
- ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids);
-
-
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
-
- // We put origin query stmt at the end of audit log, for parsing the
log more convenient.
- if (!ctx.getState().isQuery() && (parsedStmt != null &&
parsedStmt.needAuditEncryption())) {
- ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
- } else {
- if (parsedStmt instanceof InsertStmt && !((InsertStmt)
parsedStmt).needLoadManager()
- && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
- // INSERT INTO VALUES may be very long, so we only log at most
1K bytes.
- int length = Math.min(1024, origStmt.length());
- ctx.getAuditEventBuilder().setStmt(origStmt.substring(0,
length));
- } else {
- ctx.getAuditEventBuilder().setStmt(origStmt);
- }
- }
- if (!Env.getCurrentEnv().isMaster()) {
- if (ctx.executor.isForwardToMaster()) {
-
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
- }
- }
-
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
+ AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, statistics,
printFuzzyVariables);
}
// Process COM_QUERY statement,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 3ab4f31623..a4d1e1880a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2622,6 +2622,8 @@ public class StmtExecutor {
fetchResultSpan.end();
}
} finally {
+ AuditLogHelper.logAuditLog(context, originStmt.toString(),
parsedStmt, getQueryStatisticsForAuditLog(),
+ true);
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 0323929081..48f2e0e86a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
@@ -188,7 +189,7 @@ public abstract class BaseAnalysisTask {
protected void setTaskStateToRunning() {
Env.getCurrentEnv().getAnalysisManager()
- .updateTaskStatus(info, AnalysisState.RUNNING, "",
System.currentTimeMillis());
+ .updateTaskStatus(info, AnalysisState.RUNNING, "",
System.currentTimeMillis());
}
public void cancel() {
@@ -228,8 +229,8 @@ public abstract class BaseAnalysisTask {
@Override
public String toString() {
return String.format("Job id [%d], Task id [%d], catalog [%s], db
[%s], table [%s], column [%s]",
- info.jobId, info.taskId, catalog.getName(), db.getFullName(),
tbl.getName(),
- col == null ? "TableRowCount" : col.getName());
+ info.jobId, info.taskId, catalog.getName(), db.getFullName(),
tbl.getName(),
+ col == null ? "TableRowCount" : col.getName());
}
protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor)
throws Exception {
@@ -237,12 +238,18 @@ public abstract class BaseAnalysisTask {
return;
}
LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt());
- stmtExecutor.execute();
- QueryState queryState = stmtExecutor.getContext().getState();
- if (queryState.getStateType().equals(MysqlStateType.ERR)) {
- throw new RuntimeException(String.format("Failed to analyze
%s.%s.%s, error: %s sql: %s",
- info.catalogName, info.dbName, info.colName,
stmtExecutor.getOriginStmt().toString(),
- queryState.getErrorMessage()));
+ try {
+ stmtExecutor.execute();
+ QueryState queryState = stmtExecutor.getContext().getState();
+ if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+ throw new RuntimeException(String.format("Failed to analyze
%s.%s.%s, error: %s sql: %s",
+ info.catalogName, info.dbName, info.colName,
stmtExecutor.getOriginStmt().toString(),
+ queryState.getErrorMessage()));
+ }
+ } finally {
+ AuditLogHelper.logAuditLog(stmtExecutor.getContext(),
stmtExecutor.getOriginStmt().toString(),
+ stmtExecutor.getParsedStmt(),
stmtExecutor.getQueryStatisticsForAuditLog(),
+ true);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]