This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 67b0631 [Enhancement] Fix bug for auditloader plugin that audit event
cannot be processed in time (#5194)
67b0631 is described below
commit 67b0631257d13f746fd3b93b4f6633d1ef373335
Author: caiconghui <[email protected]>
AuthorDate: Thu Jan 28 10:53:32 2021 +0800
[Enhancement] Fix bug for auditloader plugin that audit event cannot be
processed in time (#5194)
* [Enhancement] Fix bug that audit event cannot be processed in time
Co-authored-by: caiconghui [蔡聪辉] <[email protected]>
---
docs/en/extending-doris/audit-plugin.md | 51 +++++++++++-----------
docs/zh-CN/extending-doris/audit-plugin.md | 7 +--
.../src/main/assembly/plugin.properties | 2 +-
.../doris/plugin/audit/AuditLoaderPlugin.java | 40 ++++++++---------
4 files changed, 50 insertions(+), 50 deletions(-)
diff --git a/docs/en/extending-doris/audit-plugin.md
b/docs/en/extending-doris/audit-plugin.md
index 651159d..4825390 100644
--- a/docs/en/extending-doris/audit-plugin.md
+++ b/docs/en/extending-doris/audit-plugin.md
@@ -57,31 +57,32 @@ After deployment is complete, and before installing the
plugin, you need to crea
```
create table doris_audit_tbl__
(
- query_id varchar (48) comment "Unique query id",
- time datetime not null comment "Query start time",
- client_ip varchar (32) comment "Client IP",
- user varchar (64) comment "User name",
- db varchar (96) comment "Database of this query",
- state varchar (8) comment "Query result state. EOF, ERR, OK",
- query_time bigint comment "Query execution time in millisecond",
- scan_bytes bigint comment "Total scan bytes of this query",
- scan_rows bigint comment "Total scan rows of this query",
- return_rows bigint comment "Returned rows of this query",
- stmt_id int comment "An incremental id of statement",
- is_query tinyint comment "Is this statemt a query. 1 or 0",
- frontend_ip varchar (32) comment "Frontend ip of executing this statement",
- stmt varchar (2048) comment "The original statement, trimed if longer than
2048 bytes"
-)
-partition by range (time) ()
-distributed by hash (query_id) buckets 1
-properties (
- "dynamic_partition.time_unit" = "DAY",
- "dynamic_partition.start" = "-30",
- "dynamic_partition.end" = "3",
- "dynamic_partition.prefix" = "p",
- "dynamic_partition.buckets" = "1",
- "dynamic_partition.enable" = "true",
- "replication_num" = "1"
+ query_id varchar(48) comment "Unique query id",
+ time datetime not null comment "Query start time",
+ client_ip varchar(32) comment "Client IP",
+ user varchar(64) comment "User name",
+ db varchar(96) comment "Database of this query",
+ state varchar(8) comment "Query result state. EOF, ERR, OK",
+ query_time bigint comment "Query execution time in millisecond",
+ scan_bytes bigint comment "Total scan bytes of this query",
+ scan_rows bigint comment "Total scan rows of this query",
+ return_rows bigint comment "Returned rows of this query",
+ stmt_id int comment "An incremental id of statement",
+ is_query tinyint comment "Is this statemt a query. 1 or 0",
+ frontend_ip varchar(32) comment "Frontend ip of executing this statement",
+ stmt varchar(5000) comment "The original statement, trimed if longer than
5000 bytes"
+) engine=OLAP
+duplicate key(query_id, time, client_ip)
+partition by range(time) ()
+distributed by hash(query_id) buckets 1
+properties(
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.start" = "-30",
+ "dynamic_partition.end" = "3",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.buckets" = "1",
+ "dynamic_partition.enable" = "true",
+ "replication_num" = "3"
);
```
diff --git a/docs/zh-CN/extending-doris/audit-plugin.md
b/docs/zh-CN/extending-doris/audit-plugin.md
index 7168251..e5f9e4c 100644
--- a/docs/zh-CN/extending-doris/audit-plugin.md
+++ b/docs/zh-CN/extending-doris/audit-plugin.md
@@ -70,8 +70,9 @@ create table doris_audit_tbl__
stmt_id int comment "An incremental id of statement",
is_query tinyint comment "Is this statemt a query. 1 or 0",
frontend_ip varchar(32) comment "Frontend ip of executing this statement",
- stmt varchar(2048) comment "The original statement, trimed if longer than
2048 bytes"
-)
+ stmt varchar(5000) comment "The original statement, trimed if longer than
5000 bytes"
+) engine=OLAP
+duplicate key(query_id, time, client_ip)
partition by range(time) ()
distributed by hash(query_id) buckets 1
properties(
@@ -81,7 +82,7 @@ properties(
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "1",
"dynamic_partition.enable" = "true",
- "replication_num" = "1"
+ "replication_num" = "3"
);
```
diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.properties
b/fe_plugins/auditloader/src/main/assembly/plugin.properties
index 3ab896c..0331385 100755
--- a/fe_plugins/auditloader/src/main/assembly/plugin.properties
+++ b/fe_plugins/auditloader/src/main/assembly/plugin.properties
@@ -18,6 +18,6 @@
name=AuditLoader
type=AUDIT
description=load audit log to olap load, and user can view the statistic of
queries
-version=0.12.1
+version=0.13.1
java.version=1.8.0
classname=org.apache.doris.plugin.audit.AuditLoaderPlugin
diff --git
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 5e7ca2d..a785768 100755
---
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -54,7 +54,7 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
private StringBuilder auditBuffer = new StringBuilder();
private long lastLoadTime = 0;
- private BlockingQueue<StringBuilder> batchQueue = new
LinkedBlockingDeque<StringBuilder>(1);
+ private BlockingQueue<AuditEvent> auditEventQueue = new
LinkedBlockingDeque<AuditEvent>(1);
private DorisStreamLoader streamLoader;
private Thread loadThread;
@@ -63,7 +63,9 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
private volatile boolean isInit = false;
// the max stmt length to be loaded in audit table.
- private static final int MAX_STMT_LENGTH = 2000;
+ private static final int MAX_STMT_LENGTH = 4096;
+ // the max auditEventQueue size to store audit_event
+ private static final int MAX_AUDIT_EVENT_SIZE = 4096;
@Override
public void init(PluginInfo info, PluginContext ctx) throws
PluginException {
@@ -132,8 +134,14 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
}
public void exec(AuditEvent event) {
- assembleAudit(event);
- loadIfNecessary();
+ try {
+ auditEventQueue.add(event);
+ } catch (Exception e) {
+ // In order to ensure that the system can run normally, here we
directly
+ // discard the current audit_event. If this problem occurs
frequently,
+ // improvement can be considered.
+ LOG.debug("encounter exception when putting current audit batch,
discard current audit event", e);
+ }
}
private void assembleAudit(AuditEvent event) {
@@ -158,7 +166,7 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
auditBuffer.append(stmt).append("\n");
}
- private void loadIfNecessary() {
+ private void loadIfNecessary(DorisStreamLoader loader) {
if (auditBuffer.length() < conf.maxBatchSize &&
System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) {
return;
}
@@ -166,15 +174,8 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
lastLoadTime = System.currentTimeMillis();
// begin to load
try {
- if (!batchQueue.isEmpty()) {
- // TODO(cmy): if queue is not empty, which means the last
batch is not processed.
- // In order to ensure that the system can run normally, here
we directly
- // discard the current batch. If this problem occurs
frequently,
- // improvement can be considered.
- throw new PluginException("The previous batch is not
processed, and the current batch is discarded.");
- }
-
- batchQueue.put(this.auditBuffer);
+ DorisStreamLoader.LoadResponse response =
loader.loadBatch(auditBuffer);
+ LOG.debug("audit loader response: {}", response);
} catch (Exception e) {
LOG.debug("encounter exception when putting current audit batch,
discard current batch", e);
} finally {
@@ -243,16 +244,13 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
public void run() {
while (!isClosed) {
try {
- StringBuilder batch = batchQueue.poll(5, TimeUnit.SECONDS);
- if (batch == null) {
- continue;
+ AuditEvent event = auditEventQueue.poll(5,
TimeUnit.SECONDS);
+ if (event != null) {
+ assembleAudit(event);
+ loadIfNecessary(loader);
}
-
- DorisStreamLoader.LoadResponse response =
loader.loadBatch(batch);
- LOG.debug("audit loader response: {}", response);
} catch (InterruptedException e) {
LOG.debug("encounter exception when loading current audit
batch", e);
- continue;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]