This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f45f26bbe2 [INLONG-10542][Agent] Remove the deleted watch directions
(#10544)
f45f26bbe2 is described below
commit f45f26bbe255f1ab525be68f453c350b56e84004
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Jul 1 21:16:50 2024 +0800
[INLONG-10542][Agent] Remove the deleted watch directions (#10544)
* [INLONG-10542][Agent] Remove the deleted watch dirs
* INLONG-10542][Agent] Remove the deleted watch dirs
---
.../inlong/agent/plugin/task/file/LogFileTask.java | 1 +
.../inlong/agent/plugin/task/file/WatchEntity.java | 86 ++++++----------------
2 files changed, 25 insertions(+), 62 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index 44cff75f2f..fbee956b0f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -425,6 +425,7 @@ public class LogFileTask extends AbstractTask {
return;
}
try {
+ entity.removeDeletedWatchDir();
/* Get all creation events until all events are consumed. */
for (int i = 0; i < entity.getTotalPathSize(); i++) {
// maybe the watchService is closed ,but we catch this
exception!
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
index a62a602784..af6d018a1d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
@@ -22,7 +22,7 @@ import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition;
import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
-import org.apache.inlong.agent.utils.DateTransUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +35,6 @@ import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -46,6 +45,10 @@ import java.util.regex.Pattern;
public class WatchEntity {
private static final Logger logger =
LoggerFactory.getLogger(WatchEntity.class);
+ // watch 1 dir per hour, clean it every year,
+ // if 100 bytes per dir, it will occupy 876k at most
+ public static final int CLEAN_WATCH_DIR_WATER_LVL = 24 * 365;
+ public static final long CHECK_WATCH_DIR_INTERVAL_MS = 1000 * 60 * 5;
private WatchService watchService;
private final String basicStaticPath;
private final String originPattern;
@@ -59,6 +62,7 @@ public class WatchEntity {
private final Map<String, WatchKey> pathToKeys = new
ConcurrentHashMap<String, WatchKey>();
private final String dirSeparator = System.getProperty("file.separator");
private String cycleUnit;
+ private long lastCheckTime;
public WatchEntity(WatchService watchService,
String originPattern,
@@ -88,7 +92,7 @@ public class WatchEntity {
public String toString() {
return "WatchEntity [parentPathName=" + basicStaticPath
+ ", readFilePattern=" + regexPattern
- + ", dateExpression=" + dateExpression + ", totalDirPattern="
+ + ", dateExpression=" + dateExpression + ",
originPatternWithoutFileName="
+ originPatternWithoutFileName + ", containRegexPattern="
+ containRegexPattern + ", totalDirRegexPattern="
+ patternWithoutFileName + ", keys=" + keys + ", pathToKeys="
+ pathToKeys
@@ -97,7 +101,8 @@ public class WatchEntity {
private boolean isPathContainRegexPattern() {
if (originPatternWithoutFileName.contains("YYYY") ||
originPatternWithoutFileName.contains("MM")
- || originPatternWithoutFileName.contains("DD") ||
originPatternWithoutFileName.contains("hh")) {
+ || originPatternWithoutFileName.contains("DD") ||
originPatternWithoutFileName.contains("hh")
+ || originPatternWithoutFileName.contains("mm")) {
return true;
}
@@ -168,9 +173,7 @@ public class WatchEntity {
} else {
return;
}
-
logger.info("rootPath len {}",
rootPath.toAbsolutePath().toString().length());
-
registerRecursively(rootPath.toFile(),
rootPath.toAbsolutePath().toString().length() + 1);
}
@@ -261,73 +264,32 @@ public class WatchEntity {
return dateExpression.getPatternPosition();
}
- /*
- * Remove the watched path which is 3 cycle units earlier than current
task data time, this is because JDK7 starts a
- * thread for each watch path, which should consume lots of memory.
- */
- public void removeUselessWatchDirectories(String curDataTime)
- throws Exception {
-
- logger.info("removeUselessWatchDirectories {}", curDataTime);
-
- /* Calculate the data time which is 3 cycle units earlier than current
task data time. */
- long curDataTimeMillis =
DateTransUtils.timeStrConvertToMillSec(curDataTime, cycleUnit);
- Calendar calendar = Calendar.getInstance();
- calendar.setTimeInMillis(curDataTimeMillis);
- if ("D".equalsIgnoreCase(cycleUnit)) {
- calendar.add(Calendar.DAY_OF_YEAR, -3);
- } else if ("h".equalsIgnoreCase(cycleUnit)) {
- calendar.add(Calendar.HOUR_OF_DAY, -3);
- } else if ("10m".equalsIgnoreCase(cycleUnit)) {
- calendar.add(Calendar.MINUTE, -30);
- }
-
- /* Calculate the 3 cycle units earlier date. */
- String year = String.valueOf(calendar.get(Calendar.YEAR));
- String month = String.valueOf(calendar.get(Calendar.MONTH) + 1);
- if (month.length() < 2) {
- month = "0" + month;
- }
- String day = String.valueOf(calendar.get(Calendar.DAY_OF_MONTH));
- if (day.length() < 2) {
- day = "0" + day;
- }
- String hour = String.valueOf(calendar.get(Calendar.HOUR_OF_DAY));
- if (hour.length() < 2) {
- hour = "0" + hour;
+ public void removeDeletedWatchDir() {
+ long now = AgentUtils.getCurrentTime();
+ if (now - lastCheckTime < CHECK_WATCH_DIR_INTERVAL_MS) {
+ return;
}
- String minute = String.valueOf(calendar.get(Calendar.MINUTE));
- if (minute.length() < 2) {
- minute = "0" + minute;
+ lastCheckTime = now;
+ if (pathToKeys.size() < CLEAN_WATCH_DIR_WATER_LVL) {
+ logger.info("originPattern {} watch dir size {}", originPattern,
pathToKeys.size());
+ return;
+ } else {
+ logger.info("originPattern {} watch dir size {} > {} try to remove
the deleted watch dir", originPattern,
+ pathToKeys.size(), CLEAN_WATCH_DIR_WATER_LVL);
}
-
- /* Replace it with the date and get a specified watch path. */
- String copyDirPattern = new String(originPatternWithoutFileName);
- copyDirPattern = copyDirPattern.replace("YYYY", year);
- copyDirPattern = copyDirPattern.replace("MM", month);
- copyDirPattern = copyDirPattern.replace("DD", day);
- copyDirPattern = copyDirPattern.replace("hh", hour);
- copyDirPattern = copyDirPattern.replace("mm", minute);
-
Set<String> keys = pathToKeys.keySet();
Set<String> tmpKeys = new HashSet<>();
tmpKeys.addAll(keys);
- String rootDir =
Paths.get(basicStaticPath).toAbsolutePath().toString();
for (String path : tmpKeys) {
- /*
- * Remove the watch path whose path is smaller than the 3 cycle
units earlier.
- */
- logger.info("[Path]{} {}", path, copyDirPattern);
- if (path.compareTo(copyDirPattern) < 0 &&
!copyDirPattern.contains(path)) {
+ File folder = new File(path);
+ if (!folder.isDirectory()) {
WatchKey key = pathToKeys.get(path);
key.cancel();
-
pathToKeys.remove(path);
-
- logger.info("Watch path: {} is too old for data time: {}, we
should remove", path,
- curDataTime);
+ logger.info("path: {} is deleted we should remove the watch",
path);
}
}
+ logger.info("pathToKeys size {} after remove", pathToKeys.size());
}
public void clearPathToKeys() {