This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a2e0f464dc [INLONG-10085][Audit] Optimize the performance of the 
audit-service (#10088)
a2e0f464dc is described below

commit a2e0f464dc649268e4bdeb5d1df8b46bbb2e5ef1
Author: doleyzi <[email protected]>
AuthorDate: Fri Apr 26 20:56:00 2024 +0800

    [INLONG-10085][Audit] Optimize the performance of the audit-service (#10088)
---
 inlong-audit/audit-service/pom.xml                 |   6 +
 .../apache/inlong/audit/cache/AbstractCache.java   |  62 +++++++--
 .../apache/inlong/audit/cache/RealTimeQuery.java   | 134 +++++++++++-------
 .../inlong/audit/config/ConfigConstants.java       |  13 ++
 .../inlong/audit/config/OpenApiConstants.java      |   7 +-
 .../apache/inlong/audit/config/SqlConstants.java   |  78 ++++++-----
 .../apache/inlong/audit/entities/SinkConfig.java   |   2 +-
 .../apache/inlong/audit/entities/SourceConfig.java |   3 +
 .../org/apache/inlong/audit/entities/StatData.java |   3 +-
 .../org/apache/inlong/audit/main/Application.java  |  66 +++++++++
 .../inlong/audit/selector/impl/DBDataSource.java   |   4 +-
 .../apache/inlong/audit/service/ApiService.java    | 153 ++++++++++++---------
 .../apache/inlong/audit/service/EtlService.java    |  67 +--------
 .../org/apache/inlong/audit/sink/JdbcSink.java     |   2 +-
 .../org/apache/inlong/audit/source/JdbcSource.java |   8 +-
 15 files changed, 379 insertions(+), 229 deletions(-)

diff --git a/inlong-audit/audit-service/pom.xml 
b/inlong-audit/audit-service/pom.xml
index 2a67f41afc..e71c33586b 100644
--- a/inlong-audit/audit-service/pom.xml
+++ b/inlong-audit/audit-service/pom.xml
@@ -86,6 +86,12 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-common</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
     <build>
         <resources>
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
index b36d4b53e3..3ad1a43bff 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
@@ -20,19 +20,23 @@ package org.apache.inlong.audit.cache;
 import org.apache.inlong.audit.config.Configuration;
 import org.apache.inlong.audit.entities.AuditCycle;
 import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.utils.CacheUtils;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS;
@@ -50,6 +54,9 @@ public class AbstractCache {
     protected AuditCycle auditCycle;
     private static final int DEFAULT_MONITOR_INTERVAL = 1;
 
+    // According to the startTime and endTime of the request parameters, the 
maximum number of cache keys generated.
+    private static final int MAX_CACHE_KEY_SIZE = 1440;
+
     protected AbstractCache(AuditCycle auditCycle) {
         cache = Caffeine.newBuilder()
                 
.maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE,
@@ -77,18 +84,55 @@ public class AbstractCache {
     }
 
     /**
-     * Get data
      *
-     * @param key
+     * @param startTime
+     * @param endTime
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param auditId
+     * @param auditTag
      * @return
      */
-    public List<StatData> getData(String key) {
-        StatData statData = cache.getIfPresent(key);
-        if (null == statData) {
-            // Compatible with scenarios where the auditTag openapi parameter 
can be empty.
-            statData = cache.getIfPresent(key + DEFAULT_AUDIT_TAG);
+    public List<StatData> getData(String startTime, String endTime, String 
inlongGroupId,
+            String inlongStreamId, String auditId, String auditTag) {
+        List<StatData> result = new LinkedList<>();
+        List<String> keyList = buildCacheKeyList(startTime, endTime, 
inlongGroupId,
+                inlongStreamId, auditId, auditTag);
+        for (String cacheKey : keyList) {
+            StatData statData = cache.getIfPresent(cacheKey);
+            if (null == statData) {
+                // Compatible with scenarios where the auditTag openapi 
parameter can be empty.
+                statData = cache.getIfPresent(cacheKey + DEFAULT_AUDIT_TAG);
+            }
+            if (null != statData) {
+                result.add(statData);
+            }
+        }
+        return result;
+    }
+
+    private List<String> buildCacheKeyList(String startTime, String endTime, 
String inlongGroupId,
+            String inlongStreamId, String auditId, String auditTag) {
+        List<String> keyList = new LinkedList<>();
+        try {
+            SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+            Date startDate = dateFormat.parse(startTime);
+            Date endDate = dateFormat.parse(endTime);
+            for (int index = 0; index < MAX_CACHE_KEY_SIZE; index++) {
+                Calendar calendar = Calendar.getInstance();
+                calendar.setTime(startDate);
+                calendar.add(Calendar.MINUTE, index * auditCycle.getValue());
+                calendar.set(Calendar.SECOND, 0);
+                if (calendar.getTime().compareTo(endDate) > 0) {
+                    break;
+                }
+                String time = dateFormat.format(calendar.getTime());
+                keyList.add(CacheUtils.buildCacheKey(time, inlongGroupId, 
inlongStreamId, auditId, auditTag));
+            }
+        } catch (Exception exception) {
+            LOGGER.error("It has exception when build cache key list!", 
exception);
         }
-        return statData == null ? new LinkedList<>() : 
Collections.singletonList(statData);
+        return keyList;
     }
 
     /**
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
index b94388a43e..ff9de5c6d5 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java
@@ -21,9 +21,9 @@ import org.apache.inlong.audit.config.Configuration;
 import org.apache.inlong.audit.entities.JdbcConfig;
 import org.apache.inlong.audit.entities.StatData;
 import org.apache.inlong.audit.service.ConfigService;
+import org.apache.inlong.audit.utils.CacheUtils;
 
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.dbcp.BasicDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,22 +33,21 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
-import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL;
 import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL;
@@ -64,7 +63,7 @@ public class RealTimeQuery {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RealTimeQuery.class);
     private static volatile RealTimeQuery realTimeQuery = null;
 
-    private final List<DataSource> dataSourceList = new LinkedList<>();
+    private final List<BasicDataSource> dataSourceList = new LinkedList<>();
 
     private final String queryLogTsSql;
     private final String queryIdsByIpSql;
@@ -73,7 +72,26 @@ public class RealTimeQuery {
     private RealTimeQuery() {
         List<JdbcConfig> jdbcConfigList = 
ConfigService.getInstance().getAllAuditSource();
         for (JdbcConfig jdbcConfig : jdbcConfigList) {
-            dataSourceList.add(createDataSource(jdbcConfig));
+            BasicDataSource dataSource = new BasicDataSource();
+            dataSource.setDriverClassName(jdbcConfig.getDriverClass());
+            dataSource.setUrl(jdbcConfig.getJdbcUrl());
+            dataSource.setUsername(jdbcConfig.getUserName());
+            dataSource.setPassword(jdbcConfig.getPassword());
+            
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+                    DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+            
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
+                    DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
+            
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
+                    DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
+            
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
+                    DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
+            dataSource.setTestOnBorrow(true);
+            dataSource.setValidationQuery("SELECT 1");
+            dataSource
+                    
.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
+                            DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));
+
+            dataSourceList.add(dataSource);
         }
 
         queryLogTsSql = 
Configuration.getInstance().get(KEY_SOURCE_QUERY_MINUTE_SQL,
@@ -95,29 +113,6 @@ public class RealTimeQuery {
         return realTimeQuery;
     }
 
-    /**
-     * Create data source.
-     */
-    private DataSource createDataSource(JdbcConfig jdbcConfig) {
-        HikariConfig config = new HikariConfig();
-        config.setDriverClassName(jdbcConfig.getDriverClass());
-        config.setJdbcUrl(jdbcConfig.getJdbcUrl());
-        config.setUsername(jdbcConfig.getUserName());
-        config.setPassword(jdbcConfig.getPassword());
-        
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
-                DEFAULT_CONNECTION_TIMEOUT));
-        config.addDataSourceProperty(CACHE_PREP_STMTS,
-                Configuration.getInstance().get(KEY_CACHE_PREP_STMTS, 
DEFAULT_CACHE_PREP_STMTS));
-        config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
-                Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE, 
DEFAULT_PREP_STMT_CACHE_SIZE));
-        config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
-                Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT, 
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
-        config.setMaximumPoolSize(
-                Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
-                        DEFAULT_DATASOURCE_POOL_SIZE));
-        return new HikariDataSource(config);
-    }
-
     /**
      * Query the audit data of log time.
      *
@@ -130,17 +125,57 @@ public class RealTimeQuery {
      */
     public List<StatData> queryLogTs(String startTime, String endTime, String 
inlongGroupId,
             String inlongStreamId, String auditId) {
+        long currentTime = System.currentTimeMillis();
         List<StatData> statDataList = new LinkedList<>();
+        if (dataSourceList.isEmpty()) {
+            return statDataList;
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
         for (DataSource dataSource : dataSourceList) {
-            statDataList =
-                    doQueryLogTs(dataSource, startTime, endTime, 
inlongGroupId, inlongStreamId, auditId);
-            if (!statDataList.isEmpty()) {
-                break;
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                List<StatData> statDataListTemp =
+                        doQueryLogTs(dataSource, startTime, endTime, 
inlongGroupId, inlongStreamId, auditId);
+                statDataList.addAll(statDataListTemp);
+            });
+            futures.add(future);
+        }
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()])).join();
+        LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms", 
startTime, endTime, inlongGroupId,
+                inlongStreamId, auditId, System.currentTimeMillis() - 
currentTime);
+        return filterMaxAuditVersion(statDataList);
+    }
+
+    /**
+     * @param allStatData
+     * @return
+     */
+    public List<StatData> filterMaxAuditVersion(List<StatData> allStatData) {
+        HashMap<String, List<StatData>> allData = new HashMap<>();
+        for (StatData statData : allStatData) {
+            String dataKey = CacheUtils.buildCacheKey(
+                    statData.getLogTs(),
+                    statData.getInlongGroupId(),
+                    statData.getInlongStreamId(),
+                    statData.getAuditId(),
+                    statData.getAuditTag());
+            List<StatData> statDataList = allData.computeIfAbsent(dataKey, k 
-> new LinkedList<>());
+            statDataList.add(statData);
+        }
+        List<StatData> result = new LinkedList<>();
+        for (Map.Entry<String, List<StatData>> entry : allData.entrySet()) {
+            long maxAuditVersion = Long.MIN_VALUE;
+            for (StatData maxData : entry.getValue()) {
+                maxAuditVersion =
+                        maxData.getAuditVersion() > maxAuditVersion ? 
maxData.getAuditVersion() : maxAuditVersion;
+            }
+            for (StatData statData : entry.getValue()) {
+                if (statData.getAuditVersion() == maxAuditVersion) {
+                    result.add(statData);
+                    break;
+                }
             }
-            LOGGER.info("Change another audit source to query data! Params is: 
{} {} {} {} {}",
-                    startTime, endTime, inlongGroupId, inlongStreamId, 
auditId);
         }
-        return statDataList;
+        return result;
     }
 
     /**
@@ -175,6 +210,7 @@ public class RealTimeQuery {
                     data.setCount(resultSet.getLong(6));
                     data.setSize(resultSet.getLong(7));
                     data.setDelay(resultSet.getLong(8));
+                    data.setAuditVersion(resultSet.getLong(9));
                     result.add(data);
                 }
             } catch (SQLException sqlException) {
@@ -203,6 +239,8 @@ public class RealTimeQuery {
                 break;
             }
         }
+        LOGGER.info("Query ids by params:{} {} {} {}, result size:{} ", 
startTime,
+                endTime, ip, auditId, statDataList.size());
         return statDataList;
     }
 
@@ -265,6 +303,8 @@ public class RealTimeQuery {
                 break;
             }
         }
+        LOGGER.info("Query ips by params:{} {} {} {} {}, result size:{} ",
+                startTime, endTime, inlongGroupId, inlongStreamId, auditId, 
statDataList.size());
         return statDataList;
     }
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
index 8d7ddf7f80..57701790d1 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -29,11 +29,24 @@ public class ConfigConstants {
     public static final String KEY_MYSQL_USERNAME = "mysql.username";
     public static final String KEY_MYSQL_PASSWORD = "mysql.password";
 
+    public static final String KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS = 
"datasource.max.total.connections";
+    public static final int DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS = 10;
+
+    public static final String KEY_DATASOURCE_MAX_IDLE_CONNECTIONS = 
"datasource.max.idle.connections";
+    public static final int DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS = 2;
+
+    public static final String KEY_DATASOURCE_MIN_IDLE_CONNECTIONS = 
"datasource.min.idle.connections";
+    public static final int DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS = 1;
+
+    public static final String KEY_DATASOURCE_DETECT_INTERVAL_MS = 
"datasource.detect.interval.ms";
+    public static final int DEFAULT_DATASOURCE_DETECT_INTERVAL_MS = 60000;
+
     // Time config
     public static final String KEY_DATASOURCE_CONNECTION_TIMEOUT = 
"datasource.connection.timeout.ms";
     public static final int DEFAULT_CONNECTION_TIMEOUT = 1000 * 60 * 5;
     public static final String KEY_QUEUE_PULL_TIMEOUT = 
"queue.pull.timeout.ms";
     public static final int DEFAULT_QUEUE_PULL_TIMEOUT = 1000;
+    public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
 
     // Interval config
     public static final String KEY_SOURCE_DB_STAT_INTERVAL = 
"source.db.stat.interval.minute";
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
index c2f4fd3d71..05643c43bf 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java
@@ -33,8 +33,8 @@ public class OpenApiConstants {
     public static final String DEFAULT_API_GET_IPS_PATH = 
"/audit/query/getIps";
     public static final String KEY_API_GET_IDS_PATH = "api.get.ids.path";
     public static final String DEFAULT_API_GET_IDS_PATH = 
"/audit/query/getIds";
-    public static final String KEY_API_POOL_SIZE = "api.pool.size";
-    public static final int DEFAULT_POOL_SIZE = 10;
+    public static final String KEY_API_THREAD_POOL_SIZE = 
"api.thread.pool.size";
+    public static final int DEFAULT_API_THREAD_POOL_SIZE = 10;
     public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
     public static final int DEFAULT_API_BACKLOG_SIZE = 100;
     public static final String KEY_API_REAL_LIMITER_QPS = 
"api.real.limiter.qps";
@@ -61,7 +61,8 @@ public class OpenApiConstants {
     public static final String KEY_HTTP_BODY_ERR_DATA = "data";
     public static final String KEY_HTTP_HEADER_CONTENT_TYPE = "Content-Type";
     public static final String VALUE_HTTP_HEADER_CONTENT_TYPE = 
"application/json;charset=utf-8";
-    public static final int BIND_PORT = 80;
+    public static final String KEY_HTTP_SERVER_BIND_PORT = 
"api.http.server.bind.port";
+    public static final int DEFAULT_HTTP_SERVER_BIND_PORT = 10080;
     public static final int HTTP_RESPOND_CODE = 200;
     public static final String DEFAULT_PARAMS_AUDIT_TAG = "";
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index 648ed1d72a..884172d32a 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -37,25 +37,38 @@ public class SqlConstants {
     // Source query sql
     public static final String KEY_SOURCE_STAT_SQL = "source.stat.sql";
     public static final String DEFAULT_SOURCE_STAT_SQL =
-            "SELECT inlong_group_id, inlong_stream_id, audit_id\n" +
-                    "\t, audit_tag, cnt, size, delay,MAX(audit_version)\n" +
+            "SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
+                    "\t, SUM(cnt) AS cnt, SUM(size) AS size\n" +
+                    "\t, SUM(delay) AS delay\n" +
                     "FROM (\n" +
-                    "\tSELECT audit_version, inlong_group_id, 
inlong_stream_id, audit_id, audit_tag\n" +
-                    "\t\t, SUM(count) AS cnt, SUM(size) AS size\n" +
-                    "\t\t, SUM(delay) AS delay\n" +
+                    "\tSELECT t_all_version.log_ts, 
t_all_version.inlong_group_id, t_all_version.inlong_stream_id, 
t_all_version.audit_id, t_all_version.audit_tag\n"
+                    +
+                    "\t\t, t_all_version.cnt, t_all_version.size, 
t_all_version.delay\n" +
                     "\tFROM (\n" +
-                    "\t\tSELECT audit_version, docker_id, thread_id, sdk_ts, 
packet_id\n" +
-                    "\t\t\t, log_ts, ip, inlong_group_id, inlong_stream_id, 
audit_id\n" +
-                    "\t\t\t, audit_tag, count, size, delay\n" +
+                    "\t\tSELECT audit_version, log_ts, inlong_group_id, 
inlong_stream_id, audit_id\n" +
+                    "\t\t\t, audit_tag, SUM(count) AS cnt, SUM(size) AS 
size\n" +
+                    "\t\t\t, SUM(delay) AS delay\n" +
                     "\t\tFROM audit_data\n" +
-                    "\t\tWHERE log_ts BETWEEN ? AND ? \n" +
-                    "\t\t\tAND audit_id = ? \n" +
-                    "\t\tGROUP BY audit_version, docker_id, thread_id, sdk_ts, 
packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, audit_tag, 
count, size, delay\n "
-                    +
-                    "\t) t1\n" +
-                    "\tGROUP BY audit_version, inlong_group_id, 
inlong_stream_id, audit_id, audit_tag\n" +
-                    ") t2\n" +
-                    "GROUP BY inlong_group_id, inlong_stream_id, audit_id, 
audit_tag, cnt, size, delay";
+                    "\t\tWHERE log_ts BETWEEN ? AND ?\n" +
+                    "\t\t\tAND audit_id = ?\n" +
+                    "\t\tGROUP BY audit_version, log_ts, inlong_group_id, 
inlong_stream_id, audit_id, audit_tag\n" +
+                    "\t) t_all_version\n" +
+                    "\t\tJOIN (\n" +
+                    "\t\t\tSELECT max(audit_version) AS audit_version, log_ts, 
inlong_group_id, inlong_stream_id\n" +
+                    "\t\t\t\t, audit_id, audit_tag\n" +
+                    "\t\t\tFROM audit_data\n" +
+                    "\t\t\tWHERE log_ts BETWEEN ? AND ?\n" +
+                    "\t\t\t\tAND audit_id = ?\n" +
+                    "\t\t\tGROUP BY log_ts, inlong_group_id, inlong_stream_id, 
audit_id, audit_tag\n" +
+                    "\t\t) t_max_version\n" +
+                    "\t\tON t_all_version.audit_version = 
t_max_version.audit_version\n" +
+                    "\t\t\tAND t_all_version.log_ts = t_max_version.log_ts\n" +
+                    "\t\t\tAND t_all_version.inlong_group_id = 
t_max_version.inlong_group_id\n" +
+                    "\t\t\tAND t_all_version.inlong_stream_id = 
t_max_version.inlong_stream_id\n" +
+                    "\t\t\tAND t_all_version.audit_id = 
t_max_version.audit_id\n" +
+                    "\t\t\tAND t_all_version.audit_tag = 
t_max_version.audit_tag\n" +
+                    ") t_sum\n" +
+                    "GROUP BY inlong_group_id, inlong_stream_id, audit_id, 
audit_tag";
 
     public static final String KEY_SOURCE_QUERY_IPS_SQL = 
"source.query.ips.sql";
     public static final String DEFAULT_SOURCE_QUERY_IPS_SQL =
@@ -82,27 +95,22 @@ public class SqlConstants {
     public static final String KEY_SOURCE_QUERY_MINUTE_SQL = 
"source.query.minute.sql";
     public static final String DEFAULT_SOURCE_QUERY_MINUTE_SQL =
             "SELECT log_ts, inlong_group_id, inlong_stream_id, audit_id, 
audit_tag\n" +
-                    "\t, cnt, size, delay, max(audit_version)\n" +
+                    "\t, sum(count) AS cnt, sum(size) AS size\n" +
+                    "\t, sum(delay) AS delay, audit_version\n" +
                     "FROM (\n" +
-                    "\tSELECT audit_version, log_ts, inlong_group_id, 
inlong_stream_id, audit_id\n" +
-                    "\t\t, audit_tag, sum(count) AS cnt, sum(size) AS size\n" +
-                    "\t\t, sum(delay) AS delay\n" +
-                    "\tFROM (\n" +
-                    "\t\tSELECT audit_version, docker_id, thread_id, sdk_ts, 
packet_id\n" +
-                    "\t\t\t, log_ts, ip, inlong_group_id, inlong_stream_id, 
audit_id\n" +
-                    "\t\t\t, audit_tag, count, size, delay\n" +
-                    "\t\tFROM audit_data\n" +
-                    "\t\tWHERE log_ts BETWEEN ? AND ?\n" +
-                    "\t\t\tAND inlong_group_id = ?\n" +
-                    "\t\t\tAND inlong_stream_id = ?\n" +
-                    "\t\t\tAND audit_id = ? \n" +
-                    "\t\tGROUP BY audit_version, docker_id, thread_id, sdk_ts, 
packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, audit_tag, 
count, size, delay\n"
+                    "\tSELECT audit_version, docker_id, thread_id, sdk_ts, 
packet_id\n" +
+                    "\t\t, log_ts, ip, inlong_group_id, inlong_stream_id, 
audit_id\n" +
+                    "\t\t, audit_tag, count, size, delay\n" +
+                    "\tFROM audit_data\n" +
+                    "\tWHERE log_ts BETWEEN ? AND ?\n" +
+                    "\t\tAND inlong_group_id = ?\n" +
+                    "\t\tAND inlong_stream_id = ?\n" +
+                    "\t\tAND audit_id = ?\n" +
+                    "\tGROUP BY audit_version, docker_id, thread_id, sdk_ts, 
packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, audit_tag, 
count, size, delay\n"
                     +
-                    "\t) t1\n" +
-                    "\tGROUP BY audit_version, log_ts, inlong_group_id, 
inlong_stream_id, audit_id, audit_tag\n" +
-                    ") t2\n" +
-                    "GROUP BY log_ts, inlong_group_id, inlong_stream_id, 
audit_id, audit_tag, cnt, size, delay \n" +
-                    "limit 1440 ";
+                    ") t_distinct\n" +
+                    "GROUP BY audit_version, log_ts, inlong_group_id, 
inlong_stream_id, audit_id, audit_tag\n" +
+                    "LIMIT 1440";
 
     // Mysql query sql
     public static final String KEY_MYSQL_SOURCE_QUERY_TEMP_SQL = 
"mysql.query.temp.sql";
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
index d2e137ec83..3cb15a9016 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java
@@ -30,7 +30,7 @@ public class SinkConfig {
     private String insertSql;
     private final String driverClassName;
     private final String jdbcUrl;
-    private final String username;
+    private final String userName;
     private final String password;
 
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
index 428ca0389f..88730a203a 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
@@ -17,12 +17,14 @@
 
 package org.apache.inlong.audit.entities;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
 
 /**
  * Source config
  */
 @Data
+@AllArgsConstructor
 public class SourceConfig {
 
     private AuditCycle auditCycle;
@@ -32,6 +34,7 @@ public class SourceConfig {
     private final String jdbcUrl;
     private final String username;
     private final String password;
+    private boolean needJoin = false;
 
     public SourceConfig(AuditCycle auditCycle,
             String querySql,
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
index f8f8981f58..910aec7ce8 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/StatData.java
@@ -24,7 +24,7 @@ import java.sql.Timestamp;
 @Data
 public class StatData {
 
-    private String auditVersion;
+    private long auditVersion;
     private String logTs;
     private String inlongGroupId;
     private String inlongStreamId;
@@ -35,4 +35,5 @@ public class StatData {
     private Long delay;
     private Timestamp updateTime;
     private String ip;
+    private String sourceName;
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
index ecd13953fc..067667133d 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
@@ -17,18 +17,34 @@
 
 package org.apache.inlong.audit.main;
 
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.JdbcConfig;
+import org.apache.inlong.audit.selector.api.Selector;
+import org.apache.inlong.audit.selector.api.SelectorConfig;
+import org.apache.inlong.audit.selector.api.SelectorFactory;
 import org.apache.inlong.audit.service.ApiService;
 import org.apache.inlong.audit.service.ConfigService;
 import org.apache.inlong.audit.service.EtlService;
+import org.apache.inlong.audit.utils.JdbcUtils;
+import org.apache.inlong.common.util.NetworkUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.UUID;
+
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID;
+
 public class Application {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(Application.class);
     private static final EtlService etlService = new EtlService();
     private static final ApiService apiService = new ApiService();
+    private static Selector selector;
+    private static boolean running = true;
 
     public static void main(String[] args) {
         try {
@@ -41,7 +57,11 @@ public class Application {
             // Api service provide audit data interface to external services
             apiService.start();
 
+            // Cleanup resource when program exit.
             stopIfKilled();
+
+            // Waiting to become the leader node.
+            waitToBeLeader();
         } catch (Exception ex) {
             LOGGER.error("Running exception: ", ex);
         }
@@ -50,12 +70,58 @@ public class Application {
     private static void stopIfKilled() {
         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
             try {
+                running = false;
                 etlService.stop();
                 apiService.stop();
+                selector.close();
                 LOGGER.info("Stopping gracefully");
             } catch (Exception ex) {
                 LOGGER.error("Stop error: ", ex);
             }
         }));
     }
+
+    /**
+     * Init selector
+     */
+    private static void initSelector() {
+        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
+        String leaderId = NetworkUtils.getLocalIp() + "-" + UUID.randomUUID();
+        LOGGER.info("Init selector. Leader id is :{}", leaderId);
+        if (selector == null) {
+            SelectorConfig electorConfig = new SelectorConfig(
+                    Configuration.getInstance().get(KEY_SELECTOR_SERVICE_ID, 
DEFAULT_SELECTOR_SERVICE_ID),
+                    leaderId,
+                    jdbcConfig.getJdbcUrl(),
+                    jdbcConfig.getUserName(), jdbcConfig.getPassword(), 
jdbcConfig.getDriverClass());
+
+            selector = SelectorFactory.getNewElector(electorConfig);
+            try {
+                selector.init();
+            } catch (Exception e) {
+                LOGGER.error("Init selector has exception:", e);
+            }
+        }
+    }
+
+    /**
+     * Wait to be leader
+     */
+    private static void waitToBeLeader() {
+        initSelector();
+        while (running) {
+            try {
+                
Thread.sleep(Configuration.getInstance().get(KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS,
+                        DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS));
+            } catch (Exception e) {
+                LOGGER.error("Wait to be Leader has exception! lost 
Leadership!", e);
+            }
+
+            if (selector.isLeader()) {
+                LOGGER.info("I get Leadership! Begin to aggregate clickhouse 
data to mysql");
+                etlService.auditSourceToMysql();
+                return;
+            }
+        }
+    }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
index 889cf87cb2..14340be873 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/selector/impl/DBDataSource.java
@@ -165,9 +165,9 @@ public class DBDataSource {
             try {
                 int result = executeUpdate(selectorSql);
                 if (result == 2) {
-                    LOGGER.info("{} get the leader", 
selectorConfig.getLeaderId());
+                    LOGGER.info("{} become the leader", 
selectorConfig.getLeaderId());
                 } else if (result == 1) {
-                    LOGGER.info("{} do not get the leader", 
selectorConfig.getLeaderId());
+                    LOGGER.info("{} waiting to be the leader", 
selectorConfig.getLeaderId());
                 }
             } catch (Exception exception) {
                 LOGGER.error("Exception: {} ,sql:{}", exception.getMessage(), 
selectorSql);
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
index 3b7c2a8264..d8dcb7ebbf 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -26,7 +26,6 @@ import org.apache.inlong.audit.config.Configuration;
 import org.apache.inlong.audit.entities.ApiType;
 import org.apache.inlong.audit.entities.AuditCycle;
 import org.apache.inlong.audit.entities.StatData;
-import org.apache.inlong.audit.utils.CacheUtils;
 
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.gson.Gson;
@@ -44,9 +43,9 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import static org.apache.inlong.audit.config.OpenApiConstants.BIND_PORT;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_DAY_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IDS_PATH;
@@ -54,8 +53,9 @@ import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IP
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_PARAMS_AUDIT_TAG;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE;
 import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH;
@@ -63,12 +63,13 @@ import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IDS_PA
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IPS_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTES_PATH;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
+import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_CYCLE;
 import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_ID;
 import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_TAG;
@@ -87,7 +88,6 @@ import static 
org.apache.inlong.audit.entities.ApiType.MINUTES;
 public class ApiService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ApiService.class);
-
     public void start() {
         initHttpServer();
     }
@@ -97,11 +97,12 @@ public class ApiService {
     }
 
     private void initHttpServer() {
+        int bindPort = 
Configuration.getInstance().get(KEY_HTTP_SERVER_BIND_PORT, 
DEFAULT_HTTP_SERVER_BIND_PORT);
         try {
-            HttpServer server = HttpServer.create(new 
InetSocketAddress(BIND_PORT),
+            HttpServer server = HttpServer.create(new 
InetSocketAddress(bindPort),
                     Configuration.getInstance().get(KEY_API_BACKLOG_SIZE, 
DEFAULT_API_BACKLOG_SIZE));
             server.setExecutor(Executors.newFixedThreadPool(
-                    Configuration.getInstance().get(KEY_API_POOL_SIZE, 
DEFAULT_POOL_SIZE)));
+                    Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE, 
DEFAULT_API_THREAD_POOL_SIZE)));
             
server.createContext(Configuration.getInstance().get(KEY_API_DAY_PATH, 
DEFAULT_API_DAY_PATH),
                     new AuditHandler(DAY));
             
server.createContext(Configuration.getInstance().get(KEY_API_HOUR_PATH, 
DEFAULT_API_HOUR_PATH),
@@ -113,15 +114,19 @@ public class ApiService {
             
server.createContext(Configuration.getInstance().get(KEY_API_GET_IPS_PATH, 
DEFAULT_API_GET_IPS_PATH),
                     new AuditHandler(GET_IPS));
             server.start();
+            LOGGER.info("Init http server success. Bind port is: {}", 
bindPort);
         } catch (Exception e) {
             LOGGER.error("Init http server has exception!", e);
         }
     }
 
-    static class AuditHandler implements HttpHandler, AutoCloseable {
+    class AuditHandler implements HttpHandler, AutoCloseable {
 
         private final ApiType apiType;
         private final RateLimiter limiter;
+        private final ExecutorService executorService =
+                Executors.newFixedThreadPool(
+                        
Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE, 
DEFAULT_API_THREAD_POOL_SIZE));
 
         public AuditHandler(ApiType apiType) {
             this.apiType = apiType;
@@ -134,26 +139,32 @@ public class ApiService {
             if (null != limiter) {
                 limiter.acquire();
             }
+            executorService.execute(new Runnable() {
 
-            try (OutputStream os = exchange.getResponseBody()) {
-                JsonObject responseJson = new JsonObject();
-
-                Map<String, String> params = 
parseRequestURI(exchange.getRequestURI().getQuery());
-                if (checkNecessaryParams(params)) {
-                    handleLegalParams(responseJson, params);
-                } else {
-                    handleInvalidParams(responseJson, exchange);
-                }
+                @Override
+                public void run() {
+                    try (OutputStream os = exchange.getResponseBody()) {
+                        JsonObject responseJson = new JsonObject();
+                        Map<String, String> params = 
parseRequestURI(exchange.getRequestURI().getQuery());
+                        if (checkNecessaryParams(params)) {
+                            handleLegalParams(responseJson, params);
+                        } else {
+                            handleInvalidParams(responseJson, exchange);
+                        }
 
-                byte[] bytes = 
responseJson.toString().getBytes(StandardCharsets.UTF_8);
+                        byte[] bytes = 
responseJson.toString().getBytes(StandardCharsets.UTF_8);
 
-                exchange.getResponseHeaders().set(KEY_HTTP_HEADER_CONTENT_TYPE,
-                        VALUE_HTTP_HEADER_CONTENT_TYPE);
-                exchange.sendResponseHeaders(HTTP_RESPOND_CODE, bytes.length);
-                os.write(bytes);
-            } catch (Exception e) {
-                LOGGER.error("Audit handler has exception!", e);
-            }
+                        
exchange.getResponseHeaders().set(KEY_HTTP_HEADER_CONTENT_TYPE,
+                                VALUE_HTTP_HEADER_CONTENT_TYPE);
+                        exchange.sendResponseHeaders(HTTP_RESPOND_CODE, 
bytes.length);
+                        os.write(bytes);
+                    } catch (Exception e) {
+                        LOGGER.error("Audit handler has exception!", e);
+                    } finally {
+                        exchange.close();
+                    }
+                }
+            });
         }
 
         private Map<String, String> parseRequestURI(String query) {
@@ -209,42 +220,47 @@ public class ApiService {
 
         private void handleLegalParams(JsonObject responseJson, Map<String, 
String> params) {
             List<StatData> statData = null;
-            switch (apiType) {
-                case MINUTES:
-                    statData = handleMinutesApi(params);
-                    break;
-                case HOUR:
-                    String cacheKey =
-                            
CacheUtils.buildCacheKey(params.get(PARAMS_START_TIME), 
params.get(PARAMS_INLONG_GROUP_Id),
-                                    params.get(PARAMS_INLONG_STREAM_Id), 
params.get(PARAMS_AUDIT_ID),
-                                    params.get(PARAMS_AUDIT_TAG));
-                    statData = HourCache.getInstance().getData(cacheKey);
-                    break;
-                case DAY:
-                    statData = DayCache.getInstance().getData(
-                            params.get(PARAMS_START_TIME),
-                            params.get(PARAMS_END_TIME),
-                            params.get(PARAMS_INLONG_GROUP_Id),
-                            params.get(PARAMS_INLONG_STREAM_Id),
-                            params.get(PARAMS_AUDIT_ID));
-                    break;
-                case GET_IDS:
-                    statData = RealTimeQuery.getInstance().queryIdsByIp(
-                            params.get(PARAMS_START_TIME),
-                            params.get(PARAMS_END_TIME),
-                            params.get(PARAMS_IP),
-                            params.get(PARAMS_AUDIT_ID));
-                    break;
-                case GET_IPS:
-                    statData = RealTimeQuery.getInstance().queryIpsById(
-                            params.get(PARAMS_START_TIME),
-                            params.get(PARAMS_END_TIME),
-                            params.get(PARAMS_INLONG_GROUP_Id),
-                            params.get(PARAMS_INLONG_STREAM_Id),
-                            params.get(PARAMS_AUDIT_ID));
-                    break;
-                default:
-                    LOGGER.error("Unsupported interface type! type is {}", 
apiType);
+            try {
+                switch (apiType) {
+                    case MINUTES:
+                        statData = handleMinutesApi(params);
+                        break;
+                    case HOUR:
+                        statData = 
HourCache.getInstance().getData(params.get(PARAMS_START_TIME),
+                                params.get(PARAMS_END_TIME),
+                                params.get(PARAMS_INLONG_GROUP_Id),
+                                params.get(PARAMS_INLONG_STREAM_Id),
+                                params.get(PARAMS_AUDIT_ID),
+                                params.get(PARAMS_AUDIT_TAG));
+                        break;
+                    case DAY:
+                        statData = DayCache.getInstance().getData(
+                                params.get(PARAMS_START_TIME),
+                                params.get(PARAMS_END_TIME),
+                                params.get(PARAMS_INLONG_GROUP_Id),
+                                params.get(PARAMS_INLONG_STREAM_Id),
+                                params.get(PARAMS_AUDIT_ID));
+                        break;
+                    case GET_IDS:
+                        statData = RealTimeQuery.getInstance().queryIdsByIp(
+                                params.get(PARAMS_START_TIME),
+                                params.get(PARAMS_END_TIME),
+                                params.get(PARAMS_IP),
+                                params.get(PARAMS_AUDIT_ID));
+                        break;
+                    case GET_IPS:
+                        statData = RealTimeQuery.getInstance().queryIpsById(
+                                params.get(PARAMS_START_TIME),
+                                params.get(PARAMS_END_TIME),
+                                params.get(PARAMS_INLONG_GROUP_Id),
+                                params.get(PARAMS_INLONG_STREAM_Id),
+                                params.get(PARAMS_AUDIT_ID));
+                        break;
+                    default:
+                        LOGGER.error("Unsupported interface type! type is {}", 
apiType);
+                }
+            } catch (Exception exception) {
+                LOGGER.error("Handle legal params has exception ", exception);
             }
 
             if (null == statData)
@@ -257,9 +273,6 @@ public class ApiService {
         }
 
         private List<StatData> handleMinutesApi(Map<String, String> params) {
-            String cacheKey = 
CacheUtils.buildCacheKey(params.get(PARAMS_START_TIME),
-                    params.get(PARAMS_INLONG_GROUP_Id),
-                    params.get(PARAMS_INLONG_STREAM_Id), 
params.get(PARAMS_AUDIT_ID), params.get(PARAMS_AUDIT_TAG));
             int cycle = Integer.parseInt(params.get(PARAMS_AUDIT_CYCLE));
             List<StatData> statData = null;
             switch (AuditCycle.fromInt(cycle)) {
@@ -271,10 +284,18 @@ public class ApiService {
                             params.get(PARAMS_AUDIT_ID));
                     break;
                 case MINUTE_10:
-                    statData = TenMinutesCache.getInstance().getData(cacheKey);
+                    statData = 
TenMinutesCache.getInstance().getData(params.get(PARAMS_START_TIME),
+                            params.get(PARAMS_END_TIME),
+                            params.get(PARAMS_INLONG_GROUP_Id),
+                            params.get(PARAMS_INLONG_STREAM_Id), 
params.get(PARAMS_AUDIT_ID),
+                            params.get(PARAMS_AUDIT_TAG));
                     break;
                 case MINUTE_30:
-                    statData = HalfHourCache.getInstance().getData(cacheKey);
+                    statData = 
HalfHourCache.getInstance().getData(params.get(PARAMS_START_TIME),
+                            params.get(PARAMS_END_TIME),
+                            params.get(PARAMS_INLONG_GROUP_Id),
+                            params.get(PARAMS_INLONG_STREAM_Id), 
params.get(PARAMS_AUDIT_ID),
+                            params.get(PARAMS_AUDIT_TAG));
                     break;
                 default:
                     LOGGER.error("Unsupported cycle type! cycle is {}", cycle);
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
index 0e0b7ff4d0..95e1cddd75 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
@@ -26,29 +26,22 @@ import org.apache.inlong.audit.entities.AuditCycle;
 import org.apache.inlong.audit.entities.JdbcConfig;
 import org.apache.inlong.audit.entities.SinkConfig;
 import org.apache.inlong.audit.entities.SourceConfig;
-import org.apache.inlong.audit.selector.api.Selector;
-import org.apache.inlong.audit.selector.api.SelectorConfig;
-import org.apache.inlong.audit.selector.api.SelectorFactory;
 import org.apache.inlong.audit.sink.CacheSink;
 import org.apache.inlong.audit.sink.JdbcSink;
 import org.apache.inlong.audit.source.JdbcSource;
 import org.apache.inlong.audit.utils.JdbcUtils;
-import org.apache.inlong.common.util.NetworkUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.LinkedList;
 import java.util.List;
-import java.util.UUID;
 
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATA_QUEUE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATA_QUEUE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_DAILY_STAT_BACK_TIMES;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_REALTIME_STAT_BACK_TIMES;
@@ -77,11 +70,8 @@ public class EtlService {
     private CacheSink cacheSinkOfTenMinutesCache;
     private CacheSink cacheSinkOfHalfHourCache;
     private CacheSink cacheSinkOfHourCache;
-
     private final int queueSize;
     private final int statBackTimes;
-    private static Selector selector;
-    private boolean running = true;
     private final String serviceId;
 
     public EtlService() {
@@ -100,9 +90,6 @@ public class EtlService {
         mysqlToTenMinutesCache();
         mysqlToHalfHourCache();
         mysqlToHourCache();
-
-        initSelector();
-        waitToBeLeader();
     }
 
     /**
@@ -166,7 +153,7 @@ public class EtlService {
      * The default audit data cycle is 5 minutes,and stored in a temporary 
table.
      * Support multiple audit source clusters.
      */
-    private void auditSourceToMysql() {
+    public void auditSourceToMysql() {
         DataQueue dataQueue = new DataQueue(queueSize);
         List<JdbcConfig> sourceList = 
ConfigService.getInstance().getAuditSourceByServiceId(serviceId);
         for (JdbcConfig jdbcConfig : sourceList) {
@@ -229,64 +216,22 @@ public class EtlService {
                 jdbcConfig.getDriverClass(),
                 jdbcConfig.getJdbcUrl(),
                 jdbcConfig.getUserName(),
-                jdbcConfig.getPassword());
-    }
-
-    /**
-     * Init selector
-     */
-    private void initSelector() {
-        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
-        String leaderId = NetworkUtils.getLocalIp() + "-" + UUID.randomUUID();
-        LOGGER.info("Init selector. Leader id is :{}", leaderId);
-        if (selector == null) {
-            SelectorConfig electorConfig = new SelectorConfig(
-                    serviceId,
-                    leaderId,
-                    jdbcConfig.getJdbcUrl(),
-                    jdbcConfig.getUserName(), jdbcConfig.getPassword(), 
jdbcConfig.getDriverClass());
-
-            selector = SelectorFactory.getNewElector(electorConfig);
-            try {
-                selector.init();
-            } catch (Exception e) {
-                LOGGER.error("Init selector has exception:", e);
-            }
-        }
-    }
-
-    /**
-     * Wait to be leader
-     */
-    public void waitToBeLeader() {
-        while (running) {
-            try {
-                
Thread.sleep(Configuration.getInstance().get(KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS,
-                        DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS));
-            } catch (Exception e) {
-                LOGGER.error("Wait to be Leader has exception! lost 
Leadership!", e);
-            }
-
-            if (selector.isLeader()) {
-                LOGGER.info("I get Leadership! Begin to aggregate clickhouse 
data to mysql");
-                auditSourceToMysql();
-                return;
-            }
-        }
+                jdbcConfig.getPassword(),
+                true);
     }
 
     /**
      * Stop the etl service,and destroy related resources.
      */
     public void stop() {
-        running = false;
         mysqlSourceOfTemp.destroy();
         mysqlSinkOfDay.destroy();
 
         for (JdbcSource source : auditJdbcSources) {
             source.destroy();
         }
-        mysqlSinkOfTemp.destroy();
+        if (null != mysqlSinkOfTemp)
+            mysqlSinkOfTemp.destroy();
 
         mysqlSourceOfTenMinutesCache.destroy();
         mysqlSourceOfHalfHourCache.destroy();
@@ -295,7 +240,5 @@ public class EtlService {
         cacheSinkOfTenMinutesCache.destroy();
         cacheSinkOfHalfHourCache.destroy();
         cacheSinkOfHourCache.destroy();
-
-        selector.close();
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
index 6f308ac3a9..db3e76a143 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
@@ -137,7 +137,7 @@ public class JdbcSink implements AutoCloseable {
         HikariConfig config = new HikariConfig();
         config.setDriverClassName(sinkConfig.getDriverClassName());
         config.setJdbcUrl(sinkConfig.getJdbcUrl());
-        config.setUsername(sinkConfig.getUsername());
+        config.setUsername(sinkConfig.getUserName());
         config.setPassword(sinkConfig.getPassword());
         
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
                 DEFAULT_CONNECTION_TIMEOUT));
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
index 4be0e7cb22..5964f8f84f 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
+import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
@@ -82,8 +83,6 @@ public class JdbcSource {
     private DataSource dataSource;
     private String querySql;
     private SourceConfig sourceConfig;
-
-    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
     private static final int MAX_MINUTE = 60;
 
     public JdbcSource(DataQueue dataQueue, SourceConfig sourceConfig) {
@@ -281,6 +280,11 @@ public class JdbcSource {
                 pstat.setString(1, startTime);
                 pstat.setString(2, endTime);
                 pstat.setString(3, auditId);
+                if (sourceConfig.isNeedJoin()) {
+                    pstat.setString(4, startTime);
+                    pstat.setString(5, endTime);
+                    pstat.setString(6, auditId);
+                }
                 try (ResultSet resultSet = pstat.executeQuery()) {
                     while (resultSet.next()) {
                         StatData data = new StatData();

Reply via email to