This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e9f888b44e [refactor](dialect) make http sql converter plugin and 
audit loader as builtin plugin (#29692)
2e9f888b44e is described below

commit 2e9f888b44e66eda49f9d5198805b7cadc4592a6
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Jan 12 20:40:45 2024 +0800

    [refactor](dialect) make http sql converter plugin and audit loader as 
builtin plugin (#29692)
    
    Followup #28890
    
    Make HttpSqlConverterPlugin and AuditLoader as Doris' builtin plugin.
    To make it simple for user to support sql dialect and using audit loader.
    
    HttpSqlConverterPlugin
    
    By default, there is nothing changed.
    
    There is a new global variable sql_converter_service, default is empty, if 
set, the HttpSqlConverterPlugin will be enabled
    
    set global sql_converter_service = "http://127.0.0.1:5001/api/v1/convert";
    
    AuditLoader
    
    By default, there is nothing changed.
    
    There is a new global variable enable_audit_plugin, default is false, if 
set to true, the audit loader plugin will be enable.
    
    Doris will create audit_log in __internal_schema when startup
    
    If enable_audit_plugin is true, the audit load will be inserted into 
audit_log table.
    
    3 other global variables related to this plugin:
    
    audit_plugin_max_batch_interval_sec: The max interval for audit loader to 
insert a batch of audit log.
    audit_plugin_max_batch_bytes: The max batch size for audit loader to insert 
a batch of audit log.
    audit_plugin_max_sql_length: The max length of statement in audit log
---
 .../main/java/org/apache/doris/common/Config.java  |   7 -
 .../org/apache/doris/analysis/InlineViewRef.java   |   2 +-
 .../doris/catalog/InternalSchemaInitializer.java   |  82 ++++++-
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  46 ++--
 .../org/apache/doris/load/StreamLoadRecordMgr.java |   6 +-
 .../org/apache/doris/load/loadv2/BulkLoadJob.java  |   4 +-
 .../org/apache/doris/nereids/parser/Dialect.java   |  26 +--
 .../java/org/apache/doris/plugin/AuditPlugin.java  |   2 +
 .../java/org/apache/doris/plugin/PluginMgr.java    |  26 ++-
 .../doris/plugin/{ => audit}/AuditEvent.java       |   9 +-
 .../doris/plugin/audit/AuditLoaderPlugin.java      | 252 +++++++++++++++++++++
 .../{qe => plugin/audit}/AuditLogBuilder.java      |   7 +-
 .../doris/plugin/audit/AuditStreamLoader.java      | 182 +++++++++++++++
 .../doris/plugin/{ => audit}/LoadAuditEvent.java   |   2 +-
 .../plugin/{ => audit}/StreamLoadAuditEvent.java   |   2 +-
 .../dialect}/HttpDialectConverterPlugin.java       |  80 ++-----
 .../doris/plugin/dialect}/HttpDialectUtils.java    |   6 +-
 .../org/apache/doris/qe/AuditEventProcessor.java   |   2 +-
 .../java/org/apache/doris/qe/AuditLogHelper.java   |  40 ++--
 .../java/org/apache/doris/qe/ConnectContext.java   |   2 +-
 .../java/org/apache/doris/qe/ConnectProcessor.java |   9 +-
 .../java/org/apache/doris/qe/GlobalVariable.java   |  23 +-
 .../WorkloadRuntimeStatusMgr.java                  |   2 +-
 .../org/apache/doris/plugin/DialectPluginTest.java |   2 +-
 .../apache/doris/plugin}/HttpDialectUtilsTest.java |  12 +-
 .../org/apache/doris/plugin}/SimpleHttpServer.java |   2 +-
 .../apache/doris/plugin/TestDialectPlugin1.java    |   2 +-
 .../apache/doris/qe/AuditEventProcessorTest.java   |   5 +-
 .../doris/plugin/audit/AuditLoaderPlugin.java      |   2 +-
 fe_plugins/http-dialect-converter/pom.xml          | 119 ----------
 .../src/main/assembly/plugin.conf                  |  22 --
 .../src/main/assembly/plugin.properties            |  23 --
 .../src/main/assembly/zip.xml                      |  43 ----
 fe_plugins/pom.xml                                 |   1 -
 34 files changed, 684 insertions(+), 368 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 8090a6b3bad..5a3685a2c5e 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2398,13 +2398,6 @@ public class Config extends ConfigBase {
             "Whether to enable the function of getting log files through http 
interface"})
     public static boolean enable_get_log_file_api = false;
 
-    // This config is deprecated and has not taken effect anymore,
-    // please use dialect plugin: fe_plugins/http-dialect-converter for instead
-    @Deprecated
-    @ConfField(description = {"用于SQL方言转换的服务地址。",
-            "The service address for SQL dialect conversion."})
-    public static String sql_convertor_service = "";
-
     @ConfField(mutable = true)
     public static boolean enable_profile_when_analyze = false;
     @ConfField(mutable = true)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java
index fb5b8072edb..178b0d0e141 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java
@@ -200,7 +200,7 @@ public class InlineViewRef extends TableRef {
         if (view == null && !hasExplicitAlias()) {
             String dialect = 
ConnectContext.get().getSessionVariable().getSqlDialect();
             Dialect sqlDialect = Dialect.getByName(dialect);
-            if (Dialect.SPARK_SQL != sqlDialect) {
+            if (Dialect.SPARK != sqlDialect) {
                 
ErrorReport.reportAnalysisException(ErrorCode.ERR_DERIVED_MUST_HAVE_ALIAS);
             }
             hasExplicitAlias = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index c7247a443b4..d53520b133c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -24,6 +24,8 @@ import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.DropTableStmt;
 import org.apache.doris.analysis.HashDistributionDesc;
 import org.apache.doris.analysis.KeysDesc;
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.RangePartitionDesc;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TypeDef;
 import org.apache.doris.common.Config;
@@ -33,6 +35,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.plugin.audit.AuditLoaderPlugin;
 import org.apache.doris.statistics.StatisticConstants;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
@@ -53,6 +56,33 @@ public class InternalSchemaInitializer extends Thread {
 
     private static final Logger LOG = 
LogManager.getLogger(InternalSchemaInitializer.class);
 
+    public static final List<ColumnDef> AUDIT_TABLE_COLUMNS;
+
+    static {
+        AUDIT_TABLE_COLUMNS = new ArrayList<>();
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_id", 
TypeDef.createVarchar(48), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("time", 
TypeDef.create(PrimitiveType.DATETIME), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("client_ip", 
TypeDef.createVarchar(128), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("user", 
TypeDef.createVarchar(128), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("catalog", 
TypeDef.createVarchar(128), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("db", 
TypeDef.createVarchar(128), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("state", 
TypeDef.createVarchar(128), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_code", 
TypeDef.create(PrimitiveType.INT), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_message", 
TypeDef.create(PrimitiveType.STRING), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_time", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_bytes", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_rows", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("return_rows", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt_id", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("is_query", 
TypeDef.create(PrimitiveType.TINYINT), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("frontend_ip", 
TypeDef.createVarchar(128), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("cpu_time_ms", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_hash", 
TypeDef.createVarchar(128), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_digest", 
TypeDef.createVarchar(128), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("peak_memory_bytes", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt", 
TypeDef.create(PrimitiveType.STRING), true));
+    }
+
     public void run() {
         if (!FeConstants.enableInternalSchemaDb) {
             return;
@@ -83,6 +113,7 @@ public class InternalSchemaInitializer extends Thread {
         Database database = op.get();
         modifyTblReplicaCount(database, StatisticConstants.STATISTIC_TBL_NAME);
         modifyTblReplicaCount(database, StatisticConstants.HISTOGRAM_TBL_NAME);
+        modifyTblReplicaCount(database, AuditLoaderPlugin.AUDIT_LOG_TABLE);
     }
 
     public void modifyTblReplicaCount(Database database, String tblName) {
@@ -103,8 +134,8 @@ public class InternalSchemaInitializer extends Thread {
                             >= 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
                         return;
                     }
+                    colStatsTbl.writeLock();
                     try {
-                        colStatsTbl.writeLock();
                         
Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) 
colStatsTbl, props);
                     } finally {
                         colStatsTbl.writeUnlock();
@@ -123,8 +154,11 @@ public class InternalSchemaInitializer extends Thread {
     }
 
     private void createTbl() throws UserException {
+        // statistics
         
Env.getCurrentEnv().getInternalCatalog().createTable(buildStatisticsTblStmt());
         
Env.getCurrentEnv().getInternalCatalog().createTable(buildHistogramTblStmt());
+        // audit table
+        
Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt());
     }
 
     @VisibleForTesting
@@ -212,7 +246,40 @@ public class InternalSchemaInitializer extends Thread {
         return createTableStmt;
     }
 
+    private CreateTableStmt buildAuditTblStmt() throws UserException {
+        TableName tableName = new TableName("",
+                FeConstants.INTERNAL_DB_NAME, 
AuditLoaderPlugin.AUDIT_LOG_TABLE);
+
+        String engineName = "olap";
+        ArrayList<String> dupKeys = Lists.newArrayList("query_id", "time", 
"client_ip");
+        KeysDesc keysDesc = new KeysDesc(KeysType.DUP_KEYS, dupKeys);
+        // partition
+        PartitionDesc partitionDesc = new 
RangePartitionDesc(Lists.newArrayList("time"), Lists.newArrayList());
+        // distribution
+        int bucketNum = 2;
+        DistributionDesc distributionDesc = new 
HashDistributionDesc(bucketNum, Lists.newArrayList("query_id"));
+        Map<String, String> properties = new HashMap<String, String>() {
+            {
+                put("dynamic_partition.time_unit", "DAY");
+                put("dynamic_partition.start", "-30");
+                put("dynamic_partition.end", "3");
+                put("dynamic_partition.prefix", "p");
+                put("dynamic_partition.buckets", String.valueOf(bucketNum));
+                put("dynamic_partition.enable", "true");
+                put("replication_num", String.valueOf(Math.max(1,
+                        Config.min_replication_num_per_tablet)));
+            }
+        };
+        CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
+                tableName, AUDIT_TABLE_COLUMNS, engineName, keysDesc, 
partitionDesc, distributionDesc,
+                properties, null, "Doris internal audit table, DO NOT MODIFY 
IT", null);
+        StatisticsUtil.analyze(createTableStmt);
+        return createTableStmt;
+    }
+
+
     private boolean created() {
+        // 1. check database exist
         Optional<Database> optionalDatabase =
                 Env.getCurrentEnv().getInternalCatalog()
                         .getDb(FeConstants.INTERNAL_DB_NAME);
@@ -225,6 +292,7 @@ public class InternalSchemaInitializer extends Thread {
             return false;
         }
 
+        // 2. check statistic tables
         Table statsTbl = optionalStatsTbl.get();
         Optional<Column> optionalColumn =
                 statsTbl.fullSchema.stream().filter(c -> 
c.getName().equals("count")).findFirst();
@@ -238,7 +306,17 @@ public class InternalSchemaInitializer extends Thread {
             }
             return false;
         }
-        return db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME).isPresent();
+        optionalStatsTbl = db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME);
+        if (!optionalStatsTbl.isPresent()) {
+            return false;
+        }
+
+        // 3. check audit table
+        optionalStatsTbl = db.getTable(AuditLoaderPlugin.AUDIT_LOG_TABLE);
+        if (!optionalStatsTbl.isPresent()) {
+            return false;
+        }
+        return true;
     }
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 2b4f20a57cb..a78a7e9fa58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.LoadException;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.entity.RestBaseResult;
 import org.apache.doris.httpv2.exception.UnauthorizedException;
+import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.Tag;
@@ -104,21 +105,21 @@ public class LoadAction extends RestBaseController {
             return redirectToHttps(request);
         }
 
-        try {
-            executeCheckPassword(request, response);
-        } catch (UnauthorizedException unauthorizedException) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Check password failed, going to check auth token, 
request: {}", request.toString());
+        String authToken = request.getHeader("token");
+        // if auth token is not null, check it first
+        if (!Strings.isNullOrEmpty(authToken)) {
+            if (!checkClusterToken(authToken)) {
+                throw new UnauthorizedException("Invalid token: " + authToken);
             }
-
-            if (!checkClusterToken(request)) {
-                throw unauthorizedException;
-            } else {
-                return executeWithClusterToken(request, db, table, true);
+            return executeWithClusterToken(request, db, table, true);
+        } else {
+            try {
+                executeCheckPassword(request, response);
+                return executeWithoutPassword(request, response, db, table, 
true, groupCommit);
+            } finally {
+                ConnectContext.remove();
             }
         }
-
-        return executeWithoutPassword(request, response, db, table, true, 
groupCommit);
     }
 
     @RequestMapping(path = "/api/_http_stream",
@@ -363,18 +364,8 @@ public class LoadAction extends RestBaseController {
     // AuditlogPlugin should be re-disigned carefully, and blow method focuses 
on
     // temporarily addressing the users' needs for audit logs.
     // So this function is not widely tested under general scenario
-    private boolean checkClusterToken(HttpServletRequest request) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Checking cluser token, request {}", request.toString());
-        }
-
-        String authToken = request.getHeader("token");
-
-        if (Strings.isNullOrEmpty(authToken)) {
-            return false;
-        }
-
-        return 
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken);
+    private boolean checkClusterToken(String token) {
+        return 
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
     }
 
     // NOTE: This function can only be used for AuditlogPlugin stream load for 
now.
@@ -388,6 +379,9 @@ public class LoadAction extends RestBaseController {
             ctx.setEnv(Env.getCurrentEnv());
             ctx.setThreadLocalInfo();
             ctx.setRemoteIP(request.getRemoteAddr());
+            // set user to ADMIN_USER, so that we can get the proper resource 
tag
+            ctx.setQualifiedUser(Auth.ADMIN_USER);
+            ctx.setThreadLocalInfo();
 
             String dbName = db;
             String tableName = table;
@@ -444,8 +438,10 @@ public class LoadAction extends RestBaseController {
 
             return redirectView;
         } catch (Exception e) {
-            LOG.warn("Failed to execute stream load with cluster token, {}", 
e);
+            LOG.warn("Failed to execute stream load with cluster token, {}", 
e.getMessage(), e);
             return new RestBaseResult(e.getMessage());
+        } finally {
+            ConnectContext.remove();
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
index 58d789f9707..488e73f3ab4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
@@ -28,9 +28,9 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.plugin.AuditEvent;
-import org.apache.doris.plugin.AuditEvent.EventType;
-import org.apache.doris.plugin.StreamLoadAuditEvent;
+import org.apache.doris.plugin.audit.AuditEvent;
+import org.apache.doris.plugin.audit.AuditEvent.EventType;
+import org.apache.doris.plugin.audit.StreamLoadAuditEvent;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TNetworkAddress;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index 1939e86f851..f3d2480351b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -41,8 +41,8 @@ import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
-import org.apache.doris.plugin.AuditEvent;
-import org.apache.doris.plugin.LoadAuditEvent;
+import org.apache.doris.plugin.audit.AuditEvent;
+import org.apache.doris.plugin.audit.LoadAuditEvent;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.SessionVariable;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java
index 722a9ff7cce..9d9b0cb4551 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java
@@ -36,25 +36,25 @@ public enum Dialect {
      */
     PRESTO("presto"),
     /**
-     * Spark sql parser dialect
+     * Spark3 sql parser dialect
      */
-    SPARK_SQL("spark_sql"),
+    SPARK("spark"),
     /**
-     * Hive parser dialect
+     * Spark2 sql parser dialect
      */
-    HIVE("hive"),
+    SPARK2("spark2"),
     /**
-     * Alibaba max compute parser dialect
+     * Flink sql parser dialect
      */
-    MAX_COMPUTE("max_compute"),
+    FLINK("flink"),
     /**
-     * Mysql parser dialect
+     * Hive parser dialect
      */
-    MYSQL("mysql"),
+    HIVE("hive"),
     /**
      * Postgresql parser dialect
      */
-    POSTGRESQL("postgresql"),
+    POSTGRES("postgres"),
     /**
      * Sqlserver parser dialect
      */
@@ -64,13 +64,9 @@ public enum Dialect {
      */
     CLICKHOUSE("clickhouse"),
     /**
-     * Sap hana parser dialect
-     */
-    SAP_HANA("sap_hana"),
-    /**
-     * OceanBase parser dialect
+     * oracle parser dialect
      */
-    OCEANBASE("oceanbase");
+    ORACLE("oracle");
 
     public static final int MAX_DIALECT_SIZE = Dialect.values().length;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java
index d9c9ec84697..55962a3dd10 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.plugin;
 
+import org.apache.doris.plugin.audit.AuditEvent;
+
 /**
  * Audit plugin interface describe.
  */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java
index 9a38bce0173..7fddf54e1ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java
@@ -27,7 +27,9 @@ import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.nereids.parser.Dialect;
 import org.apache.doris.plugin.PluginInfo.PluginType;
 import org.apache.doris.plugin.PluginLoader.PluginStatus;
-import org.apache.doris.qe.AuditLogBuilder;
+import org.apache.doris.plugin.audit.AuditLoaderPlugin;
+import org.apache.doris.plugin.audit.AuditLogBuilder;
+import org.apache.doris.plugin.dialect.HttpDialectConverterPlugin;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
@@ -104,12 +106,24 @@ public class PluginMgr implements Writable {
     }
 
     private void initBuiltinPlugins() {
-        // AuditLog
+        // AuditLog: log audit log to file
         AuditLogBuilder auditLogBuilder = new AuditLogBuilder();
         if (!registerBuiltinPlugin(auditLogBuilder.getPluginInfo(), 
auditLogBuilder)) {
             LOG.warn("failed to register audit log builder");
         }
 
+        // AuditLoader: log audit log to internal table
+        AuditLoaderPlugin auditLoaderPlugin = new AuditLoaderPlugin();
+        if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), 
auditLoaderPlugin)) {
+            LOG.warn("failed to register audit log builder");
+        }
+
+        // sql dialect converter
+        HttpDialectConverterPlugin httpDialectConverterPlugin = new 
HttpDialectConverterPlugin();
+        if (!registerBuiltinPlugin(httpDialectConverterPlugin.getPluginInfo(), 
httpDialectConverterPlugin)) {
+            LOG.warn("failed to register http dialect converter plugin");
+        }
+
         // other builtin plugins
     }
 
@@ -217,11 +231,17 @@ public class PluginMgr implements Writable {
         }
 
         PluginLoader loader = new BuiltinPluginLoader(Config.plugin_dir, 
pluginInfo, plugin);
-        PluginLoader checkLoader = 
plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader);
+        try {
+            loader.install();
+        } catch (Exception e) {
+            LOG.warn("failed to register builtin plugin {}", 
pluginInfo.getName(), e);
+            return false;
+        }
         // add dialect plugin
         if (plugin instanceof DialectConverterPlugin) {
             addDialectPlugin((DialectConverterPlugin) plugin, pluginInfo);
         }
+        PluginLoader checkLoader = 
plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader);
         return checkLoader == null;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
similarity index 97%
rename from fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
rename to fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
index 732d33c5e18..5c122c98965 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.plugin;
+package org.apache.doris.plugin.audit;
 
 
 import java.lang.annotation.Retention;
@@ -56,6 +56,8 @@ public class AuditEvent {
     public String clientIp = "";
     @AuditField(value = "User")
     public String user = "";
+    @AuditField(value = "Ctl")
+    public String ctl = "";
     @AuditField(value = "Db")
     public String db = "";
     @AuditField(value = "State")
@@ -133,6 +135,11 @@ public class AuditEvent {
             return this;
         }
 
+        public AuditEventBuilder setCtl(String ctl) {
+            auditEvent.ctl = ctl;
+            return this;
+        }
+
         public AuditEventBuilder setDb(String db) {
             auditEvent.db = db;
             return this;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
new file mode 100644
index 00000000000..533f50f062d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.plugin.audit;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.util.DigitalVersion;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.plugin.AuditPlugin;
+import org.apache.doris.plugin.Plugin;
+import org.apache.doris.plugin.PluginContext;
+import org.apache.doris.plugin.PluginException;
+import org.apache.doris.plugin.PluginInfo;
+import org.apache.doris.plugin.PluginInfo.PluginType;
+import org.apache.doris.plugin.PluginMgr;
+import org.apache.doris.qe.GlobalVariable;
+
+import com.google.common.collect.Queues;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/*
+ * This plugin will load audit log to specified doris table at specified 
interval
+ */
+public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
+    private static final Logger LOG = 
LogManager.getLogger(AuditLoaderPlugin.class);
+
+    public static final String AUDIT_LOG_TABLE = "audit_log";
+
+    private static final DateTimeFormatter DATETIME_FORMAT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
+            .withZone(ZoneId.systemDefault());
+
+    private StringBuilder auditLogBuffer = new StringBuilder();
+    private int auditLogNum = 0;
+    private long lastLoadTimeAuditLog = 0;
+    // sometimes the audit log may fail to load to doris, count it to observe.
+    private long discardLogNum = 0;
+
+    private BlockingQueue<AuditEvent> auditEventQueue;
+    private AuditStreamLoader streamLoader;
+    private Thread loadThread;
+
+    private volatile boolean isClosed = false;
+    private volatile boolean isInit = false;
+
+    private final PluginInfo pluginInfo;
+
+    public AuditLoaderPlugin() {
+        pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + 
"AuditLoader", PluginType.AUDIT,
+                "builtin audit loader, to load audit log to internal table", 
DigitalVersion.fromString("2.1.0"),
+                DigitalVersion.fromString("1.8.31"), 
AuditLoaderPlugin.class.getName(), null, null);
+    }
+
+    public PluginInfo getPluginInfo() {
+        return pluginInfo;
+    }
+
+    @Override
+    public void init(PluginInfo info, PluginContext ctx) throws 
PluginException {
+        super.init(info, ctx);
+
+        synchronized (this) {
+            if (isInit) {
+                return;
+            }
+            this.lastLoadTimeAuditLog = System.currentTimeMillis();
+            // make capacity large enough to avoid blocking.
+            // and it will not be too large because the audit log will flush 
if num in queue is larger than
+            // GlobalVariable.audit_plugin_max_batch_bytes.
+            this.auditEventQueue = Queues.newLinkedBlockingDeque(100000);
+            this.streamLoader = new AuditStreamLoader();
+            this.loadThread = new Thread(new LoadWorker(this.streamLoader), 
"audit loader thread");
+            this.loadThread.start();
+
+            isInit = true;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+        isClosed = true;
+        if (loadThread != null) {
+            try {
+                loadThread.join();
+            } catch (InterruptedException e) {
+                LOG.debug("encounter exception when closing the audit loader", 
e);
+            }
+        }
+    }
+
+    public boolean eventFilter(AuditEvent.EventType type) {
+        return type == AuditEvent.EventType.AFTER_QUERY;
+    }
+
+    public void exec(AuditEvent event) {
+        if (!GlobalVariable.enableAuditLoader) {
+            LOG.debug("builtin audit loader is disabled, discard current audit 
event");
+            return;
+        }
+        try {
+            auditEventQueue.add(event);
+        } catch (Exception e) {
+            // In order to ensure that the system can run normally, here we 
directly
+            // discard the current audit_event. If this problem occurs 
frequently,
+            // improvement can be considered.
+            ++discardLogNum;
+            LOG.debug("encounter exception when putting current audit batch, 
discard current audit event."
+                    + " total discard num: {}", discardLogNum, e);
+        }
+    }
+
+    private void assembleAudit(AuditEvent event) {
+        fillLogBuffer(event, auditLogBuffer);
+        ++auditLogNum;
+    }
+
+    private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
+        logBuffer.append(event.queryId).append("\t");
+        
logBuffer.append(TimeUtils.longToTimeString(event.timestamp)).append("\t");
+        logBuffer.append(event.clientIp).append("\t");
+        logBuffer.append(event.user).append("\t");
+        logBuffer.append(event.ctl).append("\t");
+        logBuffer.append(event.db).append("\t");
+        logBuffer.append(event.state).append("\t");
+        logBuffer.append(event.errorCode).append("\t");
+        logBuffer.append(event.errorMessage).append("\t");
+        logBuffer.append(event.queryTime).append("\t");
+        logBuffer.append(event.scanBytes).append("\t");
+        logBuffer.append(event.scanRows).append("\t");
+        logBuffer.append(event.returnRows).append("\t");
+        logBuffer.append(event.stmtId).append("\t");
+        logBuffer.append(event.isQuery ? 1 : 0).append("\t");
+        logBuffer.append(event.feIp).append("\t");
+        logBuffer.append(event.cpuTimeMs).append("\t");
+        logBuffer.append(event.sqlHash).append("\t");
+        logBuffer.append(event.sqlDigest).append("\t");
+        logBuffer.append(event.peakMemoryBytes).append("\t");
+        // trim the query to avoid too long
+        // use `getBytes().length` to get real byte length
+        String stmt = truncateByBytes(event.stmt).replace("\n", " ")
+                .replace("\t", " ")
+                .replace("\r", " ");
+        LOG.debug("receive audit event with stmt: {}", stmt);
+        logBuffer.append(stmt).append("\n");
+    }
+
+    private String truncateByBytes(String str) {
+        int maxLen = Math.min(GlobalVariable.auditPluginMaxSqlLength, 
str.getBytes().length);
+        if (maxLen >= str.getBytes().length) {
+            return str;
+        }
+        Charset utf8Charset = Charset.forName("UTF-8");
+        CharsetDecoder decoder = utf8Charset.newDecoder();
+        byte[] sb = str.getBytes();
+        ByteBuffer buffer = ByteBuffer.wrap(sb, 0, maxLen);
+        CharBuffer charBuffer = CharBuffer.allocate(maxLen);
+        decoder.onMalformedInput(CodingErrorAction.IGNORE);
+        decoder.decode(buffer, charBuffer, true);
+        decoder.flush(charBuffer);
+        return new String(charBuffer.array(), 0, charBuffer.position());
+    }
+
+    private void loadIfNecessary(AuditStreamLoader loader) {
+        long currentTime = System.currentTimeMillis();
+
+        if (auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes
+                || currentTime - lastLoadTimeAuditLog >= 
GlobalVariable.auditPluginMaxBatchInternalSec * 1000) {
+            // begin to load
+            try {
+                String token = "";
+                try {
+                    // Acquire token from master
+                    token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+                } catch (Exception e) {
+                    LOG.warn("Failed to get auth token: {}", e);
+                    discardLogNum += auditLogNum;
+                    return;
+                }
+                AuditStreamLoader.LoadResponse response = 
loader.loadBatch(auditLogBuffer, token);
+                LOG.debug("audit loader response: {}", response);
+            } catch (Exception e) {
+                LOG.debug("encounter exception when putting current audit 
batch, discard current batch", e);
+                discardLogNum += auditLogNum;
+            } finally {
+                // make a new string builder to receive following events.
+                resetBatch(currentTime);
+                if (discardLogNum > 0) {
+                    LOG.info("num of total discarded audit logs: {}", 
discardLogNum);
+                }
+            }
+        }
+
+        return;
+    }
+
+    private void resetBatch(long currentTime) {
+        this.auditLogBuffer = new StringBuilder();
+        this.lastLoadTimeAuditLog = currentTime;
+        this.auditLogNum = 0;
+    }
+
+    private class LoadWorker implements Runnable {
+        private AuditStreamLoader loader;
+
+        public LoadWorker(AuditStreamLoader loader) {
+            this.loader = loader;
+        }
+
+        public void run() {
+            while (!isClosed) {
+                try {
+                    AuditEvent event = auditEventQueue.poll(5, 
TimeUnit.SECONDS);
+                    if (event != null) {
+                        assembleAudit(event);
+                        // process all audit logs
+                        loadIfNecessary(loader);
+                    }
+                } catch (InterruptedException ie) {
+                    LOG.debug("encounter exception when loading current audit 
batch", ie);
+                } catch (Exception e) {
+                    LOG.error("run audit logger error:", e);
+                }
+            }
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
similarity index 97%
rename from fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
index 1258d0b3376..f06dfb6eef9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
@@ -15,19 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.qe;
+package org.apache.doris.plugin.audit;
 
 import org.apache.doris.common.AuditLog;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.DigitalVersion;
-import org.apache.doris.plugin.AuditEvent;
-import org.apache.doris.plugin.AuditEvent.AuditField;
-import org.apache.doris.plugin.AuditEvent.EventType;
 import org.apache.doris.plugin.AuditPlugin;
 import org.apache.doris.plugin.Plugin;
 import org.apache.doris.plugin.PluginInfo;
 import org.apache.doris.plugin.PluginInfo.PluginType;
 import org.apache.doris.plugin.PluginMgr;
+import org.apache.doris.plugin.audit.AuditEvent.AuditField;
+import org.apache.doris.plugin.audit.AuditEvent.EventType;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
new file mode 100644
index 00000000000..07175478275
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.plugin.audit;
+
+import org.apache.doris.catalog.InternalSchemaInitializer;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Calendar;
+import java.util.stream.Collectors;
+
+public class AuditStreamLoader {
+    private static final Logger LOG = 
LogManager.getLogger(AuditStreamLoader.class);
+    private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";;
+    private String hostPort;
+    private String db;
+    private String auditLogTbl;
+    private String auditLogLoadUrlStr;
+    private String feIdentity;
+
+    public AuditStreamLoader() {
+        this.hostPort = "127.0.0.1:" + Config.http_port;
+        this.db = FeConstants.INTERNAL_DB_NAME;
+        this.auditLogTbl = AuditLoaderPlugin.AUDIT_LOG_TABLE;
+        this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, 
auditLogTbl);
+        // currently, FE identity is FE's IP, so we replace the "." in IP to 
make it suitable for label
+        this.feIdentity = hostPort.replaceAll("\\.", "_");
+    }
+
+    private HttpURLConnection getConnection(String urlStr, String label, 
String clusterToken) throws IOException {
+        URL url = new URL(urlStr);
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setInstanceFollowRedirects(false);
+        conn.setRequestMethod("PUT");
+        conn.setRequestProperty("token", clusterToken);
+        conn.setRequestProperty("Authorization", "Basic ");
+        conn.addRequestProperty("Expect", "100-continue");
+        conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
+        conn.addRequestProperty("label", label);
+        conn.addRequestProperty("max_filter_ratio", "1.0");
+        conn.addRequestProperty("columns",
+                InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c 
-> c.getName()).collect(
+                        Collectors.joining(",")));
+        conn.setDoOutput(true);
+        conn.setDoInput(true);
+        return conn;
+    }
+
+    private String toCurl(HttpURLConnection conn) {
+        StringBuilder sb = new StringBuilder("curl -v ");
+        sb.append("-X ").append(conn.getRequestMethod()).append(" \\\n  ");
+        sb.append("-H \"").append("Authorization\":").append("\"Basic 
").append("\" \\\n  ");
+        sb.append("-H \"").append("Expect\":").append("\"100-continue\" \\\n  
");
+        sb.append("-H \"").append("Content-Type\":").append("\"text/plain; 
charset=UTF-8\" \\\n  ");
+        sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n  
");
+        sb.append("-H \"").append("columns\":")
+                .append("\"" + 
InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> 
c.getName()).collect(
+                        Collectors.joining(",")) + "\" \\\n  ");
+        sb.append("\"").append(conn.getURL()).append("\"");
+        return sb.toString();
+    }
+
+    private String getContent(HttpURLConnection conn) {
+        BufferedReader br = null;
+        StringBuilder response = new StringBuilder();
+        String line;
+        try {
+            if (100 <= conn.getResponseCode() && conn.getResponseCode() <= 
399) {
+                br = new BufferedReader(new 
InputStreamReader(conn.getInputStream()));
+            } else {
+                br = new BufferedReader(new 
InputStreamReader(conn.getErrorStream()));
+            }
+            while ((line = br.readLine()) != null) {
+                response.append(line);
+            }
+        } catch (IOException e) {
+            LOG.warn("get content error,", e);
+        }
+
+        return response.toString();
+    }
+
+    public LoadResponse loadBatch(StringBuilder sb, String clusterToken) {
+        Calendar calendar = Calendar.getInstance();
+        String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s",
+                calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, 
calendar.get(Calendar.DAY_OF_MONTH),
+                calendar.get(Calendar.HOUR_OF_DAY), 
calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
+                feIdentity);
+
+        HttpURLConnection feConn = null;
+        HttpURLConnection beConn = null;
+        try {
+            // build request and send to fe
+            label = "audit" + label;
+            feConn = getConnection(auditLogLoadUrlStr, label, clusterToken);
+            int status = feConn.getResponseCode();
+            // fe send back http response code TEMPORARY_REDIRECT 307 and new 
be location
+            if (status != 307) {
+                throw new Exception("status is not TEMPORARY_REDIRECT 307, 
status: " + status
+                        + ", response: " + getContent(feConn) + ", request is: 
" + toCurl(feConn));
+            }
+            String location = feConn.getHeaderField("Location");
+            if (location == null) {
+                throw new Exception("redirect location is null");
+            }
+            // build request and send to new be location
+            beConn = getConnection(location, label, clusterToken);
+            // send data to be
+            try (BufferedOutputStream bos = new 
BufferedOutputStream(beConn.getOutputStream())) {
+                bos.write(sb.toString().getBytes());
+            }
+
+            // get respond
+            status = beConn.getResponseCode();
+            String respMsg = beConn.getResponseMessage();
+            String response = getContent(beConn);
+
+            LOG.info("AuditLoader plugin load with label: {}, response code: 
{}, msg: {}, content: {}",
+                    label, status, respMsg, response);
+
+            return new LoadResponse(status, respMsg, response);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            String err = "failed to load audit via AuditLoader plugin with 
label: " + label;
+            LOG.warn(err, e);
+            return new LoadResponse(-1, e.getMessage(), err);
+        } finally {
+            if (feConn != null) {
+                feConn.disconnect();
+            }
+            if (beConn != null) {
+                beConn.disconnect();
+            }
+        }
+    }
+
+    public static class LoadResponse {
+        public int status;
+        public String respMsg;
+        public String respContent;
+
+        public LoadResponse(int status, String respMsg, String respContent) {
+            this.status = status;
+            this.respMsg = respMsg;
+            this.respContent = respContent;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("status: ").append(status);
+            sb.append(", resp msg: ").append(respMsg);
+            sb.append(", resp content: ").append(respContent);
+            return sb.toString();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/LoadAuditEvent.java
similarity index 99%
rename from fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/plugin/audit/LoadAuditEvent.java
index 704ec3ad039..eb3e098bf41 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/LoadAuditEvent.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.plugin;
+package org.apache.doris.plugin.audit;
 
 public class LoadAuditEvent extends AuditEvent {
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/StreamLoadAuditEvent.java
similarity index 99%
rename from 
fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/plugin/audit/StreamLoadAuditEvent.java
index 04b15b9264c..8733a59656c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/StreamLoadAuditEvent.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.plugin;
+package org.apache.doris.plugin.audit;
 
 public class StreamLoadAuditEvent extends AuditEvent {
 
diff --git 
a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectConverterPlugin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java
similarity index 56%
rename from 
fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectConverterPlugin.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java
index a21e5018395..29dca027c6a 100644
--- 
a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectConverterPlugin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java
@@ -15,32 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.plugin.dialect.http;
+package org.apache.doris.plugin.dialect;
 
 import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.common.util.DigitalVersion;
 import org.apache.doris.nereids.parser.Dialect;
 import org.apache.doris.plugin.DialectConverterPlugin;
 import org.apache.doris.plugin.Plugin;
 import org.apache.doris.plugin.PluginContext;
 import org.apache.doris.plugin.PluginException;
 import org.apache.doris.plugin.PluginInfo;
+import org.apache.doris.plugin.PluginInfo.PluginType;
+import org.apache.doris.plugin.PluginMgr;
+import org.apache.doris.qe.GlobalVariable;
 import org.apache.doris.qe.SessionVariable;
 
-import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 /**
@@ -72,57 +67,29 @@ import javax.annotation.Nullable;
 public class HttpDialectConverterPlugin extends Plugin implements 
DialectConverterPlugin {
 
     private volatile boolean isInit = false;
-    private volatile boolean isClosed = false;
-    private volatile String targetURL = null;
-    private volatile ImmutableSet<Dialect> acceptDialects = null;
+    private volatile ImmutableSet<Dialect> acceptDialects;
+    private final PluginInfo pluginInfo;
+
+    public HttpDialectConverterPlugin() {
+        pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + 
"SqlDialectConverter", PluginType.DIALECT,
+                "builtin sql dialect converter", 
DigitalVersion.fromString("2.1.0"),
+                DigitalVersion.fromString("1.8.31"), 
HttpDialectConverterPlugin.class.getName(), null, null);
+        acceptDialects = ImmutableSet.copyOf(Arrays.asList(Dialect.PRESTO, 
Dialect.TRINO, Dialect.HIVE,
+                Dialect.SPARK, Dialect.POSTGRES, Dialect.CLICKHOUSE));
+    }
+
+    public PluginInfo getPluginInfo() {
+        return pluginInfo;
+    }
 
     @Override
     public void init(PluginInfo info, PluginContext ctx) throws 
PluginException {
         super.init(info, ctx);
-
-        synchronized (this) {
-            if (isInit) {
-                return;
-            }
-            loadConfig(ctx, info.getProperties());
-            isInit = true;
-        }
-    }
-
-    private void loadConfig(PluginContext ctx, Map<String, String> 
pluginInfoProperties) throws PluginException {
-        Path pluginPath = 
FileSystems.getDefault().getPath(ctx.getPluginPath());
-        if (!Files.exists(pluginPath)) {
-            throw new PluginException("plugin path does not exist: " + 
pluginPath);
-        }
-
-        Path confFile = pluginPath.resolve("plugin.conf");
-        if (!Files.exists(confFile)) {
-            throw new PluginException("plugin conf file does not exist: " + 
confFile);
-        }
-
-        final Properties props = new Properties();
-        try (InputStream stream = Files.newInputStream(confFile)) {
-            props.load(stream);
-        } catch (IOException e) {
-            throw new PluginException(e.getMessage());
-        }
-
-        for (Map.Entry<String, String> entry : 
pluginInfoProperties.entrySet()) {
-            props.setProperty(entry.getKey(), entry.getValue());
-        }
-
-        final Map<String, String> properties = 
props.stringPropertyNames().stream()
-                .collect(Collectors.toMap(Function.identity(), 
props::getProperty));
-        targetURL = properties.get("target_url");
-        String acceptDialectsStr = 
Objects.requireNonNull(properties.get("accept_dialects"));
-        acceptDialects = 
ImmutableSet.copyOf(Arrays.stream(acceptDialectsStr.split(","))
-                    .map(Dialect::getByName).collect(Collectors.toSet()));
     }
 
     @Override
     public void close() throws IOException {
         super.close();
-        isClosed = true;
     }
 
     @Override
@@ -132,8 +99,11 @@ public class HttpDialectConverterPlugin extends Plugin 
implements DialectConvert
 
     @Override
     public @Nullable String convertSql(String originSql, SessionVariable 
sessionVariable) {
-        Preconditions.checkNotNull(targetURL);
-        return HttpDialectUtils.convertSql(targetURL, originSql);
+        String targetURL = GlobalVariable.sqlConverterServiceUrl;
+        if (Strings.isNullOrEmpty(targetURL)) {
+            return null;
+        }
+        return HttpDialectUtils.convertSql(targetURL, originSql, 
sessionVariable.getSqlDialect());
     }
 
     // no need to override parseSqlWithDialect, just return null
diff --git 
a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
similarity index 98%
rename from 
fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectUtils.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
index 73c2f470673..39c6417988f 100644
--- 
a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.plugin.dialect.http;
+package org.apache.doris.plugin.dialect;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
@@ -38,8 +38,8 @@ import java.nio.charset.StandardCharsets;
 public class HttpDialectUtils {
     private static final Logger LOG = 
LogManager.getLogger(HttpDialectUtils.class);
 
-    public static String convertSql(String targetURL, String originStmt) {
-        ConvertRequest convertRequest = new ConvertRequest(originStmt, 
"presto");
+    public static String convertSql(String targetURL, String originStmt, 
String dialect) {
+        ConvertRequest convertRequest = new ConvertRequest(originStmt, 
dialect);
 
         HttpURLConnection connection = null;
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java
index c116e7c16ae..f9ab35f9c23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java
@@ -17,11 +17,11 @@
 
 package org.apache.doris.qe;
 
-import org.apache.doris.plugin.AuditEvent;
 import org.apache.doris.plugin.AuditPlugin;
 import org.apache.doris.plugin.Plugin;
 import org.apache.doris.plugin.PluginInfo.PluginType;
 import org.apache.doris.plugin.PluginMgr;
+import org.apache.doris.plugin.audit.AuditEvent;
 
 import com.google.common.collect.Queues;
 import org.apache.logging.log4j.LogManager;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 83cd1d401f8..7b6e86ca3a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -24,8 +24,11 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.metric.MetricRepo;
-import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.plugin.audit.AuditEvent.AuditEventBuilder;
+import org.apache.doris.plugin.audit.AuditEvent.EventType;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.service.FrontendOptions;
 
@@ -39,8 +42,17 @@ public class AuditLogHelper {
         // slow query
         long endTime = System.currentTimeMillis();
         long elapseMs = endTime - ctx.getStartTime();
+        CatalogIf catalog = ctx.getCurrentCatalog();
 
-        ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
+        AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
+        auditEventBuilder.reset();
+        auditEventBuilder
+                .setTimestamp(ctx.getStartTime())
+                .setClientIp(ctx.getClientIP())
+                
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
+                .setSqlHash(ctx.getSqlHash())
+                .setEventType(EventType.AFTER_QUERY)
+                .setCtl(catalog == null ? 
InternalCatalog.INTERNAL_CATALOG_NAME : catalog.getName())
                 .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
                 .setState(ctx.getState().toString())
                 .setErrorCode(ctx.getState().getErrorCode() == null ? 0 : 
ctx.getState().getErrorCode().getCode())
@@ -73,10 +85,10 @@ public class AuditLogHelper {
 
                 if (elapseMs > Config.qe_slow_log_ms) {
                     String sqlDigest = DigestUtils.md5Hex(((Queriable) 
parsedStmt).toDigest());
-                    ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
+                    auditEventBuilder.setSqlDigest(sqlDigest);
                 }
             }
-            ctx.getAuditEventBuilder().setIsQuery(true);
+            auditEventBuilder.setIsQuery(true);
             if (ctx.getQueryDetail() != null) {
                 ctx.getQueryDetail().setEventTime(endTime);
                 ctx.getQueryDetail().setEndTime(endTime);
@@ -90,35 +102,35 @@ public class AuditLogHelper {
                 ctx.setQueryDetail(null);
             }
         } else {
-            ctx.getAuditEventBuilder().setIsQuery(false);
+            auditEventBuilder.setIsQuery(false);
         }
-        ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids);
+        auditEventBuilder.setIsNereids(ctx.getState().isNereids);
 
-        
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
+        auditEventBuilder.setFeIp(FrontendOptions.getLocalHostAddress());
 
         // We put origin query stmt at the end of audit log, for parsing the 
log more convenient.
         if (!ctx.getState().isQuery() && (parsedStmt != null && 
parsedStmt.needAuditEncryption())) {
-            ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
+            auditEventBuilder.setStmt(parsedStmt.toSql());
         } else {
             if (parsedStmt instanceof InsertStmt && !((InsertStmt) 
parsedStmt).needLoadManager()
                     && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
                 // INSERT INTO VALUES may be very long, so we only log at most 
1K bytes.
                 int length = Math.min(1024, origStmt.length());
-                ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, 
length));
+                auditEventBuilder.setStmt(origStmt.substring(0, length));
             } else {
-                ctx.getAuditEventBuilder().setStmt(origStmt);
+                auditEventBuilder.setStmt(origStmt);
             }
         }
         if (!Env.getCurrentEnv().isMaster()) {
             if (ctx.executor.isForwardToMaster()) {
-                
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
+                auditEventBuilder.setState(ctx.executor.getProxyStatus());
                 int proxyStatusCode = ctx.executor.getProxyStatusCode();
                 if (proxyStatusCode != 0) {
-                    ctx.getAuditEventBuilder().setErrorCode(proxyStatusCode);
-                    
ctx.getAuditEventBuilder().setErrorMessage(ctx.executor.getProxyErrMsg());
+                    auditEventBuilder.setErrorCode(proxyStatusCode);
+                    
auditEventBuilder.setErrorMessage(ctx.executor.getProxyErrMsg());
                 }
             }
         }
-        
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build());
+        
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEventBuilder.build());
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 59833d0ea5d..66ec5c2ce0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -48,7 +48,7 @@ import org.apache.doris.mysql.MysqlSslContext;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
-import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
+import org.apache.doris.plugin.audit.AuditEvent.AuditEventBuilder;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
 import org.apache.doris.statistics.ColumnStatistic;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index e4d8f8273f8..b297d192c42 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -28,7 +28,6 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -181,12 +180,6 @@ public abstract class ConnectProcessor {
         String convertedStmt = convertOriginStmt(originStmt);
         String sqlHash = DigestUtils.md5Hex(convertedStmt);
         ctx.setSqlHash(sqlHash);
-        ctx.getAuditEventBuilder().reset();
-        ctx.getAuditEventBuilder()
-                .setTimestamp(System.currentTimeMillis())
-                .setClientIp(ctx.getClientIP())
-                
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
-                .setSqlHash(ctx.getSqlHash());
 
         List<StatementBase> stmts = null;
         Exception nereidsParseException = null;
@@ -291,7 +284,7 @@ public abstract class ConnectProcessor {
     private String convertOriginStmt(String originStmt) {
         String convertedStmt = originStmt;
         @Nullable Dialect sqlDialect = 
Dialect.getByName(ctx.getSessionVariable().getSqlDialect());
-        if (sqlDialect != null) {
+        if (sqlDialect != null && sqlDialect != Dialect.DORIS) {
             PluginMgr pluginMgr = Env.getCurrentEnv().getPluginMgr();
             List<DialectConverterPlugin> plugins = 
pluginMgr.getActiveDialectPluginList(sqlDialect);
             for (DialectConverterPlugin plugin : plugins) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java
index 419bb1377a5..a8374a7b440 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java
@@ -50,6 +50,12 @@ public final class GlobalVariable {
     public static final long VALIDATE_PASSWORD_POLICY_DISABLED = 0;
     public static final long VALIDATE_PASSWORD_POLICY_STRONG = 2;
 
+    public static final String SQL_CONVERTER_SERVICE_URL = 
"sql_converter_service_url";
+    public static final String ENABLE_AUDIT_PLUGIN = "enable_audit_plugin";
+    public static final String AUDIT_PLUGIN_MAX_BATCH_BYTES = 
"audit_plugin_max_batch_bytes";
+    public static final String AUDIT_PLUGIN_MAX_BATCH_INTERVAL_SEC = 
"audit_plugin_max_batch_interval_sec";
+    public static final String AUDIT_PLUGIN_MAX_SQL_LENGTH = 
"audit_plugin_max_sql_length";
+
     @VariableMgr.VarAttr(name = VERSION_COMMENT, flag = VariableMgr.READ_ONLY)
     public static String versionComment = "Doris version "
             + Version.DORIS_BUILD_VERSION + "-" + 
Version.DORIS_BUILD_SHORT_HASH;
@@ -104,7 +110,22 @@ public final class GlobalVariable {
     @VariableMgr.VarAttr(name = SHOW_FULL_DBNAME_IN_INFO_SCHEMA_DB, flag = 
VariableMgr.GLOBAL)
     public static boolean showFullDbNameInInfoSchemaDb = false;
 
-    // Don't allow to create instance.
+    @VariableMgr.VarAttr(name = SQL_CONVERTER_SERVICE_URL, flag = 
VariableMgr.GLOBAL)
+    public static String sqlConverterServiceUrl = "";
+
+    @VariableMgr.VarAttr(name = ENABLE_AUDIT_PLUGIN, flag = VariableMgr.GLOBAL)
+    public static boolean enableAuditLoader = false;
+
+    @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_BATCH_BYTES, flag = 
VariableMgr.GLOBAL)
+    public static long auditPluginMaxBatchBytes = 50 * 1024 * 1024;
+
+    @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_BATCH_INTERVAL_SEC, flag = 
VariableMgr.GLOBAL)
+    public static long auditPluginMaxBatchInternalSec = 60;
+
+    @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_SQL_LENGTH, flag = 
VariableMgr.GLOBAL)
+    public static int auditPluginMaxSqlLength = 4096;
+
+    // Don't allow creating instance.
     private GlobalVariable() {
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index 085d844e616..1f8cef92714 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -20,7 +20,7 @@ package org.apache.doris.resource.workloadschedpolicy;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.Daemon;
-import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.audit.AuditEvent;
 import org.apache.doris.thrift.TQueryStatistics;
 import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java
index 2f0a720ca7e..b717b2bbf35 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java
@@ -61,7 +61,7 @@ public class DialectPluginTest extends TestWithFeService {
 
     @Test
     public void testSparkPlugin() {
-        
ConnectContext.get().getSessionVariable().setSqlDialect(Dialect.SPARK_SQL.getDialectName());
+        
ConnectContext.get().getSessionVariable().setSqlDialect(Dialect.SPARK.getDialectName());
         NereidsParser parser = new NereidsParser();
         List<StatementBase> stmts = parser.parseSQL(TEST_SQL, 
ConnectContext.get().getSessionVariable());
         Assertions.assertEquals(1, stmts.size());
diff --git 
a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/HttpDialectUtilsTest.java
 b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
similarity index 90%
rename from 
fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/HttpDialectUtilsTest.java
rename to 
fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
index 532d660f432..ac40c353769 100644
--- 
a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/HttpDialectUtilsTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.plugin.dialect.http;
+package org.apache.doris.plugin;
+
+import org.apache.doris.plugin.dialect.HttpDialectUtils;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -52,20 +54,20 @@ public class HttpDialectUtilsTest {
         String expectedSql = "select * from t1 where `k1` = 1";
 
         String targetURL = "http://127.0.0.1:"; + port + "/api/v1/convert";
-        String res = HttpDialectUtils.convertSql(targetURL, originSql);
+        String res = HttpDialectUtils.convertSql(targetURL, originSql, 
"presto");
         Assert.assertEquals(originSql, res);
         // test presto
         server.setResponse("{\"version\": \"v1\", \"data\": \"" + expectedSql 
+ "\", \"code\": 0, \"message\": \"\"}");
-        res = HttpDialectUtils.convertSql(targetURL, originSql);
+        res = HttpDialectUtils.convertSql(targetURL, originSql, "presto");
         Assert.assertEquals(expectedSql, res);
         // test response version error
         server.setResponse("{\"version\": \"v2\", \"data\": \"" + expectedSql 
+ "\", \"code\": 0, \"message\": \"\"}");
-        res = HttpDialectUtils.convertSql(targetURL, originSql);
+        res = HttpDialectUtils.convertSql(targetURL, originSql, "presto");
         Assert.assertEquals(originSql, res);
         // 7. test response code error
         server.setResponse(
                 "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", 
\"code\": 400, \"message\": \"\"}");
-        res = HttpDialectUtils.convertSql(targetURL, originSql);
+        res = HttpDialectUtils.convertSql(targetURL, originSql, "presto");
         Assert.assertEquals(originSql, res);
     }
 
diff --git 
a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/SimpleHttpServer.java
 b/fe/fe-core/src/test/java/org/apache/doris/plugin/SimpleHttpServer.java
similarity index 98%
rename from 
fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/SimpleHttpServer.java
rename to fe/fe-core/src/test/java/org/apache/doris/plugin/SimpleHttpServer.java
index 10ae33e4358..dd6ad991e06 100644
--- 
a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/SimpleHttpServer.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/SimpleHttpServer.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.plugin.dialect.http;
+package org.apache.doris.plugin;
 
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java 
b/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java
index 05a6b34b978..27fc36b9c55 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java
@@ -32,7 +32,7 @@ public class TestDialectPlugin1 extends Plugin implements 
DialectConverterPlugin
 
     @Override
     public ImmutableSet<Dialect> acceptDialects() {
-        return ImmutableSet.of(Dialect.SPARK_SQL);
+        return ImmutableSet.of(Dialect.SPARK);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java
index 465bbef3d64..26c51fb66db 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java
@@ -19,9 +19,10 @@ package org.apache.doris.qe;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.util.DigitalVersion;
-import org.apache.doris.plugin.AuditEvent;
-import org.apache.doris.plugin.AuditEvent.EventType;
 import org.apache.doris.plugin.PluginInfo;
+import org.apache.doris.plugin.audit.AuditEvent;
+import org.apache.doris.plugin.audit.AuditEvent.EventType;
+import org.apache.doris.plugin.audit.AuditLogBuilder;
 import org.apache.doris.utframe.UtFrameUtils;
 
 import org.junit.AfterClass;
diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 4c5586ed881..ed4b7efc651 100755
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -219,7 +219,7 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
                     } catch (Exception e) {
                         LOG.error("Failed to get auth token: {}", e);
                     }
-                }  
+                }
                 DorisStreamLoader.LoadResponse response = 
loader.loadBatch(logBuffer, slowLog, token);
                 LOG.debug("audit loader response: {}", response);
             } catch (Exception e) {
diff --git a/fe_plugins/http-dialect-converter/pom.xml 
b/fe_plugins/http-dialect-converter/pom.xml
deleted file mode 100644
index 2486ec1b24b..00000000000
--- a/fe_plugins/http-dialect-converter/pom.xml
+++ /dev/null
@@ -1,119 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xmlns="http://maven.apache.org/POM/4.0.0";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <groupId>org.apache.doris</groupId>
-        <artifactId>fe-plugins</artifactId>
-        <version>1.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>http-dialect-converter</artifactId>
-    <packaging>jar</packaging>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.doris</groupId>
-            <artifactId>fe-core</artifactId>
-            <version>${doris.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.doris</groupId>
-            <artifactId>fe-common</artifactId>
-            <version>${doris.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.projectlombok</groupId>
-            <artifactId>lombok</artifactId>
-            <version>${lombok.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <!-- 
https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-api</artifactId>
-        </dependency>
-        <!-- 
https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-core</artifactId>
-        </dependency>
-        <!-- 
https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j-impl -->
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-slf4j-impl</artifactId>
-        </dependency>
-        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-        </dependency>
-        <!-- 
https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-engine -->
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-engine</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <!-- 
https://mvnrepository.com/artifact/org.junit.vintage/junit-vintage-engine -->
-        <dependency>
-            <groupId>org.junit.vintage</groupId>
-            <artifactId>junit-vintage-engine</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <!-- 
https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-params -->
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-params</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <!-- https://mvnrepository.com/artifact/org.jmockit/jmockit -->
-        <dependency>
-            <groupId>org.jmockit</groupId>
-            <artifactId>jmockit</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-    <build>
-        <finalName>presto-converter</finalName>
-        <plugins>
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>2.4.1</version>
-                <configuration>
-                    <appendAssemblyId>false</appendAssemblyId>
-                    <descriptors>
-                        <descriptor>src/main/assembly/zip.xml</descriptor>
-                    </descriptors>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>
diff --git a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.conf 
b/fe_plugins/http-dialect-converter/src/main/assembly/plugin.conf
deleted file mode 100755
index cb298280396..00000000000
--- a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.conf
+++ /dev/null
@@ -1,22 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-### plugin configuration
-# Replace the target url, set your sql converter service url here
-target_url=http://127.0.0.1:8080/api/v1/sql/convert
-# Replace the dialects if you need, use comma to split the value, for 
instance: presto,trino,hive
-accept_dialects=presto
\ No newline at end of file
diff --git 
a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.properties 
b/fe_plugins/http-dialect-converter/src/main/assembly/plugin.properties
deleted file mode 100755
index d06bf1c6acc..00000000000
--- a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-name=HttpDialectConverter
-type=DIALECT
-description=SQL dialect converter plugin using http protocol.
-version=1.0.0
-java.version=1.8.0
-classname=org.apache.doris.plugin.dialect.http.HttpDialectConverterPlugin
diff --git a/fe_plugins/http-dialect-converter/src/main/assembly/zip.xml 
b/fe_plugins/http-dialect-converter/src/main/assembly/zip.xml
deleted file mode 100644
index 515e68751b0..00000000000
--- a/fe_plugins/http-dialect-converter/src/main/assembly/zip.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<assembly>
-    <id>plugin</id>
-    <formats>
-        <format>zip</format>
-    </formats>
-    <includeBaseDirectory>false</includeBaseDirectory>
-    <fileSets>
-        <fileSet>
-            <directory>target</directory>
-            <includes>
-                <include>*.jar</include>
-            </includes>
-            <outputDirectory>/</outputDirectory>
-        </fileSet>
-        <fileSet>
-            <directory>src/main/assembly</directory>
-            <includes>
-                <include>plugin.properties</include>
-                <include>plugin.conf</include>
-            </includes>
-            <outputDirectory>/</outputDirectory>
-        </fileSet>
-    </fileSets>
-</assembly>
diff --git a/fe_plugins/pom.xml b/fe_plugins/pom.xml
index 9ace25e2719..ef2ade6f676 100644
--- a/fe_plugins/pom.xml
+++ b/fe_plugins/pom.xml
@@ -65,7 +65,6 @@ under the License.
     <modules>
         <module>auditdemo</module>
         <module>auditloader</module>
-        <module>http-dialect-converter</module>
         <module>trino-converter</module>
         <module>sparksql-converter</module>
     </modules>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to