This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 47f33c3ec4 [INLONG-11406][Audit] Provides an interface for 
asynchronously flushing Audit data (#11409)
47f33c3ec4 is described below

commit 47f33c3ec4415367feb86b3f2c7cad2e8649616a
Author: doleyzi <[email protected]>
AuthorDate: Fri Oct 25 09:50:07 2024 +0800

    [INLONG-11406][Audit] Provides an interface for asynchronously flushing 
Audit data (#11409)
---
 .../inlong/audit/utils/NamedThreadFactory.java     | 36 +++++++++++++++
 .../org/apache/inlong/audit/AuditReporterImpl.java | 52 +++++++++++++++++++---
 .../org/apache/inlong/audit/send/ProxyManager.java |  7 ++-
 .../apache/inlong/audit/send/SenderManager.java    |  4 +-
 4 files changed, 89 insertions(+), 10 deletions(-)

diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/NamedThreadFactory.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/NamedThreadFactory.java
new file mode 100644
index 0000000000..cd751ec03c
--- /dev/null
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/NamedThreadFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NamedThreadFactory implements ThreadFactory {
+
+    private final String baseName;
+    private final AtomicInteger counter = new AtomicInteger(0);
+
+    public NamedThreadFactory(String baseName) {
+        this.baseName = baseName;
+    }
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+        return new Thread(runnable, baseName + "-Thread-" + 
counter.getAndIncrement());
+    }
+}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index 7d3f1c5755..bd2c5aae47 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -32,6 +32,7 @@ import org.apache.inlong.audit.util.AuditValues;
 import org.apache.inlong.audit.util.Config;
 import org.apache.inlong.audit.util.RequestIdUtils;
 import org.apache.inlong.audit.util.StatInfo;
+import org.apache.inlong.audit.utils.NamedThreadFactory;
 
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -76,7 +77,8 @@ public class AuditReporterImpl implements Serializable {
     private final ConcurrentHashMap<Long, HashSet<String>> expiredKeyList = 
new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Long, Long> flushTime = new 
ConcurrentHashMap<>();
     private final Config config = new Config();
-    private final ScheduledExecutorService timeoutExecutor = 
Executors.newSingleThreadScheduledExecutor();
+    private final ScheduledExecutorService timerExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("inlong-audit-flush"));
     private int packageId = 1;
     private int dataId = 0;
     private volatile boolean initialized = false;
@@ -86,6 +88,7 @@ public class AuditReporterImpl implements Serializable {
     private SocketAddressListLoader loader = null;
     private int flushStatThreshold = 100;
     private boolean autoFlush = true;
+    private boolean enableDebug = false;
 
     private AuditMetric auditMetric = new AuditMetric();
 
@@ -108,6 +111,14 @@ public class AuditReporterImpl implements Serializable {
         this.autoFlush = autoFlush;
     }
 
+    /**
+     * Debug mode supports printing audit details in the log
+     * @param enableDebug
+     */
+    public void setEnableDebug(boolean enableDebug) {
+        this.enableDebug = enableDebug;
+    }
+
     /**
      * Init
      */
@@ -116,7 +127,7 @@ public class AuditReporterImpl implements Serializable {
             return;
         }
         config.init();
-        timeoutExecutor.scheduleWithFixedDelay(new Runnable() {
+        timerExecutor.scheduleWithFixedDelay(new Runnable() {
 
             @Override
             public void run() {
@@ -301,6 +312,22 @@ public class AuditReporterImpl implements Serializable {
         stat.delay.addAndGet(delayTime);
     }
 
+    /**
+     * Asynchronously flush audit data
+     * @param isolateKey
+     */
+    public synchronized void asyncFlush(long isolateKey) {
+        LOGGER.info("Async flush audit by isolate key: {} ", isolateKey);
+        Runnable task = () -> {
+            try {
+                flush(isolateKey);
+            } catch (Exception e) {
+                LOGGER.error("Async flush audit by isolate key: {}, has 
exception: ", isolateKey, e);
+            }
+        };
+        timerExecutor.schedule(task, 0, TimeUnit.MILLISECONDS);
+    }
+
     /**
      * Flush audit data by default audit version
      */
@@ -314,10 +341,12 @@ public class AuditReporterImpl implements Serializable {
     public synchronized void flush(long isolateKey) {
         if (flushTime.putIfAbsent(isolateKey, System.currentTimeMillis()) != 
null
                 || flushStat.addAndGet(1) > flushStatThreshold) {
+            LOGGER.info("Skip audit flush isolate key: {}, last flush time: 
{}, count: {}", isolateKey,
+                    flushTime.get(isolateKey), flushStat.get());
             return;
         }
         long startTime = System.currentTimeMillis();
-        LOGGER.info("Audit flush isolate key {} ", isolateKey);
+        LOGGER.info("Audit flush isolate key: {} ", isolateKey);
 
         try {
             manager.checkFailedData();
@@ -444,7 +473,7 @@ public class AuditReporterImpl implements Serializable {
         while (iterator.hasNext()) {
             Map.Entry<Long, HashSet<String>> entry = iterator.next();
             if (entry.getValue().isEmpty()) {
-                LOGGER.info("Remove the key of expired key list: {},isolate 
key: {}", entry.getKey(), isolateKey);
+                LOGGER.info("Remove the key of expired key list: {}, isolate 
key: {}", entry.getKey(), isolateKey);
                 iterator.remove();
                 continue;
             }
@@ -528,17 +557,20 @@ public class AuditReporterImpl implements Serializable {
             if (dataId++ >= BATCH_NUM) {
                 dataId = 0;
                 packageId++;
-                sendData(requestBuild);
+                sendData(requestBuild, isolateKey);
             }
         }
 
         if (requestBuild.getMsgBodyCount() > 0) {
-            sendData(requestBuild);
+            sendData(requestBuild, isolateKey);
         }
         summaryStatMap.get(isolateKey).clear();
     }
 
-    private void sendData(AuditApi.AuditRequest.Builder requestBuild) {
+    private void sendData(AuditApi.AuditRequest.Builder requestBuild, long 
isolateKey) {
+        if (enableDebug) {
+            LOGGER.info("Send audit data by isolate key: {}, data: {}", 
isolateKey, requestBuild);
+        }
         requestBuild.setRequestId(RequestIdUtils.nextRequestId());
         sendByBaseCommand(requestBuild.build());
         auditMetric.addTotalMsg(requestBuild.getMsgBodyCount());
@@ -554,6 +586,7 @@ public class AuditReporterImpl implements Serializable {
         flushTime.forEach((key, value) -> {
             if ((currentTime - value) > PERIOD) {
                 flushTime.remove(key);
+                LOGGER.info("Remove audit flush limitation. isolate key: {}, 
flush time: {}", key, value);
             }
         });
     }
@@ -649,4 +682,9 @@ public class AuditReporterImpl implements Serializable {
     public void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) {
         SenderManager.setMaxGlobalAuditMemory(maxGlobalAuditMemory);
     }
+
+    public void shutdown() {
+        ProxyManager.getInstance().shutdown();
+        timerExecutor.shutdown();
+    }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
index 127609d2ab..b87c563da3 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
@@ -21,6 +21,7 @@ import org.apache.inlong.audit.entity.AuditComponent;
 import org.apache.inlong.audit.entity.AuditProxy;
 import org.apache.inlong.audit.entity.CommonResponse;
 import org.apache.inlong.audit.utils.HttpUtils;
+import org.apache.inlong.audit.utils.NamedThreadFactory;
 import org.apache.inlong.audit.utils.ThreadUtils;
 
 import org.slf4j.Logger;
@@ -41,7 +42,8 @@ public class ProxyManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxyManager.class);
     private static final ProxyManager instance = new ProxyManager();
     private final List<String> currentIpPorts = new CopyOnWriteArrayList<>();
-    private final ScheduledExecutorService timer = 
Executors.newSingleThreadScheduledExecutor();
+    private final ScheduledExecutorService timer =
+            Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("inlong-audit-proxy-manager"));
     private final static String GET_AUDIT_PROXY_API_PATH = 
"/inlong/manager/openapi/audit/getAuditProxy";
     private int timeoutMs = 10000;
     private int updateInterval = 60000;
@@ -167,4 +169,7 @@ public class ProxyManager {
         }
         return new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
     }
+    public void shutdown() {
+        timer.shutdown();
+    }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index f941cbae6d..b795b7aa98 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -80,12 +80,12 @@ public class SenderManager {
             try {
                 InetSocketAddress inetSocketAddress = 
ProxyManager.getInstance().getInetSocketAddress();
                 if (inetSocketAddress == null) {
-                    LOGGER.error("Audit inet socket address is null!");
+                    LOGGER.error("Audit proxy address is null!");
                     return false;
                 }
                 reconnect(inetSocketAddress, auditConfig.getSocketTimeout());
             } catch (IOException exception) {
-                LOGGER.error("Connect to {} has exception!", 
socket.getInetAddress(), exception);
+                LOGGER.error("Connect to audit proxy {} has exception!", 
socket.getInetAddress(), exception);
                 return false;
             }
         }

Reply via email to