This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 67b0631  [Enhancement] Fix bug for auditloader plugin that audit event 
cannot be processed in time (#5194)
67b0631 is described below

commit 67b0631257d13f746fd3b93b4f6633d1ef373335
Author: caiconghui <[email protected]>
AuthorDate: Thu Jan 28 10:53:32 2021 +0800

    [Enhancement] Fix bug for auditloader plugin that audit event cannot be 
processed in time (#5194)
    
    * [Enhancement] Fix bug that audit event cannot be processed in time
    
    Co-authored-by: caiconghui [蔡聪辉] <[email protected]>
---
 docs/en/extending-doris/audit-plugin.md            | 51 +++++++++++-----------
 docs/zh-CN/extending-doris/audit-plugin.md         |  7 +--
 .../src/main/assembly/plugin.properties            |  2 +-
 .../doris/plugin/audit/AuditLoaderPlugin.java      | 40 ++++++++---------
 4 files changed, 50 insertions(+), 50 deletions(-)

diff --git a/docs/en/extending-doris/audit-plugin.md 
b/docs/en/extending-doris/audit-plugin.md
index 651159d..4825390 100644
--- a/docs/en/extending-doris/audit-plugin.md
+++ b/docs/en/extending-doris/audit-plugin.md
@@ -57,31 +57,32 @@ After deployment is complete, and before installing the 
plugin, you need to crea
 ```
 create table doris_audit_tbl__
 (
-    query_id varchar (48) comment "Unique query id",
-    time datetime not null comment "Query start time",
-    client_ip varchar (32) comment "Client IP",
-    user varchar (64) comment "User name",
-    db varchar (96) comment "Database of this query",
-    state varchar (8) comment "Query result state. EOF, ERR, OK",
-    query_time bigint comment "Query execution time in millisecond",
-    scan_bytes bigint comment "Total scan bytes of this query",
-    scan_rows bigint comment "Total scan rows of this query",
-    return_rows bigint comment "Returned rows of this query",
-    stmt_id int comment "An incremental id of statement",
-    is_query tinyint comment "Is this statemt a query. 1 or 0",
-    frontend_ip varchar (32) comment "Frontend ip of executing this statement",
-    stmt varchar (2048) comment "The original statement, trimed if longer than 
2048 bytes"
-)
-partition by range (time) ()
-distributed by hash (query_id) buckets 1
-properties (
-    "dynamic_partition.time_unit" = "DAY",
-    "dynamic_partition.start" = "-30",
-    "dynamic_partition.end" = "3",
-    "dynamic_partition.prefix" = "p",
-    "dynamic_partition.buckets" = "1",
-    "dynamic_partition.enable" = "true",
-    "replication_num" = "1"
+    query_id varchar(48) comment "Unique query id",
+    time datetime not null comment "Query start time",
+    client_ip varchar(32) comment "Client IP",
+    user varchar(64) comment "User name",
+    db varchar(96) comment "Database of this query",
+    state varchar(8) comment "Query result state. EOF, ERR, OK",
+    query_time bigint comment "Query execution time in millisecond",
+    scan_bytes bigint comment "Total scan bytes of this query",
+    scan_rows bigint comment "Total scan rows of this query",
+    return_rows bigint comment "Returned rows of this query",
+    stmt_id int comment "An incremental id of statement",
+    is_query tinyint comment "Is this statemt a query. 1 or 0",
+    frontend_ip varchar(32) comment "Frontend ip of executing this statement",
+    stmt varchar(5000) comment "The original statement, trimed if longer than 
5000 bytes"
+) engine=OLAP
+duplicate key(query_id, time, client_ip)
+partition by range(time) ()
+distributed by hash(query_id) buckets 1
+properties(
+    "dynamic_partition.time_unit" = "DAY",
+    "dynamic_partition.start" = "-30",
+    "dynamic_partition.end" = "3",
+    "dynamic_partition.prefix" = "p",
+    "dynamic_partition.buckets" = "1",
+    "dynamic_partition.enable" = "true",
+    "replication_num" = "3"
 );
 ```
 
diff --git a/docs/zh-CN/extending-doris/audit-plugin.md 
b/docs/zh-CN/extending-doris/audit-plugin.md
index 7168251..e5f9e4c 100644
--- a/docs/zh-CN/extending-doris/audit-plugin.md
+++ b/docs/zh-CN/extending-doris/audit-plugin.md
@@ -70,8 +70,9 @@ create table doris_audit_tbl__
     stmt_id int comment "An incremental id of statement",
     is_query tinyint comment "Is this statemt a query. 1 or 0",
     frontend_ip varchar(32) comment "Frontend ip of executing this statement",
-    stmt varchar(2048) comment "The original statement, trimed if longer than 
2048 bytes"
-)
+    stmt varchar(5000) comment "The original statement, trimed if longer than 
5000 bytes"
+) engine=OLAP
+duplicate key(query_id, time, client_ip)
 partition by range(time) ()
 distributed by hash(query_id) buckets 1
 properties(
@@ -81,7 +82,7 @@ properties(
     "dynamic_partition.prefix" = "p",
     "dynamic_partition.buckets" = "1",
     "dynamic_partition.enable" = "true",
-    "replication_num" = "1"
+    "replication_num" = "3"
 );
 ```
 
diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.properties 
b/fe_plugins/auditloader/src/main/assembly/plugin.properties
index 3ab896c..0331385 100755
--- a/fe_plugins/auditloader/src/main/assembly/plugin.properties
+++ b/fe_plugins/auditloader/src/main/assembly/plugin.properties
@@ -18,6 +18,6 @@
 name=AuditLoader
 type=AUDIT
 description=load audit log to olap load, and user can view the statistic of 
queries
-version=0.12.1
+version=0.13.1
 java.version=1.8.0
 classname=org.apache.doris.plugin.audit.AuditLoaderPlugin
diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 5e7ca2d..a785768 100755
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -54,7 +54,7 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
     private StringBuilder auditBuffer = new StringBuilder();
     private long lastLoadTime = 0;
 
-    private BlockingQueue<StringBuilder> batchQueue = new 
LinkedBlockingDeque<StringBuilder>(1);
+    private BlockingQueue<AuditEvent> auditEventQueue = new 
LinkedBlockingDeque<AuditEvent>(1);
     private DorisStreamLoader streamLoader;
     private Thread loadThread;
 
@@ -63,7 +63,9 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
     private volatile boolean isInit = false;
 
     // the max stmt length to be loaded in audit table.
-    private static final int MAX_STMT_LENGTH = 2000;
+    private static final int MAX_STMT_LENGTH = 4096;
+    // the max auditEventQueue size to store audit_event
+    private static final int MAX_AUDIT_EVENT_SIZE = 4096;
 
     @Override
     public void init(PluginInfo info, PluginContext ctx) throws 
PluginException {
@@ -132,8 +134,14 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
     }
 
     public void exec(AuditEvent event) {
-        assembleAudit(event);
-        loadIfNecessary();
+        try {
+            auditEventQueue.add(event);
+        } catch (Exception e) {
+            // In order to ensure that the system can run normally, here we 
directly
+            // discard the current audit_event. If this problem occurs 
frequently,
+            // improvement can be considered.
+            LOG.debug("encounter exception when putting current audit batch, 
discard current audit event", e);
+        }
     }
 
     private void assembleAudit(AuditEvent event) {
@@ -158,7 +166,7 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         auditBuffer.append(stmt).append("\n");
     }
 
-    private void loadIfNecessary() {
+    private void loadIfNecessary(DorisStreamLoader loader) {
         if (auditBuffer.length() < conf.maxBatchSize && 
System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) {
             return;
         }
@@ -166,15 +174,8 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         lastLoadTime = System.currentTimeMillis();
         // begin to load
         try {
-            if (!batchQueue.isEmpty()) {
-                // TODO(cmy): if queue is not empty, which means the last 
batch is not processed.
-                // In order to ensure that the system can run normally, here 
we directly
-                // discard the current batch. If this problem occurs 
frequently,
-                // improvement can be considered.
-                throw new PluginException("The previous batch is not 
processed, and the current batch is discarded.");
-            }
-
-            batchQueue.put(this.auditBuffer);
+            DorisStreamLoader.LoadResponse response = 
loader.loadBatch(auditBuffer);
+            LOG.debug("audit loader response: {}", response);
         } catch (Exception e) {
             LOG.debug("encounter exception when putting current audit batch, 
discard current batch", e);
         } finally {
@@ -243,16 +244,13 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         public void run() {
             while (!isClosed) {
                 try {
-                    StringBuilder batch = batchQueue.poll(5, TimeUnit.SECONDS);
-                    if (batch == null) {
-                        continue;
+                    AuditEvent event = auditEventQueue.poll(5, 
TimeUnit.SECONDS);
+                    if (event != null) {
+                        assembleAudit(event);
+                        loadIfNecessary(loader);
                     }
-
-                    DorisStreamLoader.LoadResponse response = 
loader.loadBatch(batch);
-                    LOG.debug("audit loader response: {}", response);
                 } catch (InterruptedException e) {
                     LOG.debug("encounter exception when loading current audit 
batch", e);
-                    continue;
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to