This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 119240cdbc [INLONG-9143][Agent] Add log file collect task (#9145)
119240cdbc is described below
commit 119240cdbc806cb0f98058625f460809b9b69ed5
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Oct 30 11:10:55 2023 +0800
[INLONG-9143][Agent] Add log file collect task (#9145)
* [INLONG-9143][Agent] Add log file collect task
* [INLONG-9143][Agent] Add log file collect task
fix ut failed
* [INLONG-9143][Agent] Add log file collect task
Try to fix check failed
* [INLONG-9143][Agent] Add log file collect task
fix ut error
---
.../apache/inlong/agent/plugin/task/CronTask.java | 58 +++
.../plugin/task/FormatDateLogFileCollectTask.java | 27 ++
.../inlong/agent/plugin/task/PathPattern.java | 181 ++++++++
.../agent/plugin/task/filecollect/AgentErrMsg.java | 67 +++
.../agent/plugin/task/filecollect/FileScanner.java | 174 +++++++
.../task/filecollect/LogFileCollectTask.java | 501 +++++++++++++++++++++
.../agent/plugin/task/filecollect/TaskType.java | 39 ++
.../agent/plugin/task/filecollect/WatchEntity.java | 358 +++++++++++++++
.../inlong/agent/plugin/AgentBaseTestsHelper.java | 50 +-
.../agent/plugin/task/MockInstanceManager.java | 24 +
.../agent/plugin/task/TestLogfileCollectTask.java | 119 +++++
.../src/test/resources/test/20230928_1.txt | 3 +
12 files changed, 1593 insertions(+), 8 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
new file mode 100644
index 0000000000..0216eb96c6
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.plugin.file.Task;
+
+/**
+ * Generate job by crontab expression.
+ */
+public class CronTask extends Task {
+
+ @Override
+ public void init(Object srcManager, TaskProfile profile, Db basicDb) {
+
+ }
+
+ @Override
+ public void run() {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public TaskProfile getProfile() {
+ return null;
+ }
+
+ @Override
+ public String getTaskId() {
+ return null;
+ }
+
+ @Override
+ public void addCallbacks() {
+
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileCollectTask.java
new file mode 100644
index 0000000000..b8837fc0df
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileCollectTask.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
+
+/**
+ * Directory trigger with format date.
+ */
+public class FormatDateLogFileCollectTask extends LogFileCollectTask {
+
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PathPattern.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PathPattern.java
new file mode 100644
index 0000000000..137b0a98de
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PathPattern.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.plugin.filter.DateFormatRegex;
+import org.apache.inlong.agent.utils.PathUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Path pattern for file filter.
+ * It’s identified by watchDir, which matches {@link PathPattern#whiteList}.
+ */
+public class PathPattern {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PathPattern.class);
+
+ private final String rootDir;
+ private final Set<String> subDirs;
+ // regex for those files should be matched
+ private final Set<DateFormatRegex> whiteList;
+
+ public PathPattern(String rootDir, Set<String> whiteList) {
+ this(rootDir, whiteList, null);
+ }
+
+ public PathPattern(String rootDir, Set<String> whiteList, String offset) {
+ this.rootDir = rootDir;
+ this.subDirs = new HashSet<>();
+ if (offset != null && StringUtils.isNotBlank(offset)) {
+ this.whiteList = whiteList.stream()
+ .map(whiteRegex ->
DateFormatRegex.ofRegex(whiteRegex).withOffset(offset))
+ .collect(Collectors.toSet());
+ updateDateFormatRegex();
+ } else {
+ this.whiteList = whiteList.stream()
+ .map(whiteRegex -> DateFormatRegex.ofRegex(whiteRegex))
+ .collect(Collectors.toSet());
+ }
+ }
+
+ public static Set<PathPattern> buildPathPattern(Set<String> whiteList,
+ String offset) {
+ Set<String> commonWatchDir = PathUtils.findCommonRootPath(whiteList);
+ return commonWatchDir.stream().map(rootDir -> {
+ Set<String> commonWatchDirWhiteList =
+ whiteList.stream()
+ .filter(whiteRegex ->
whiteRegex.startsWith(rootDir))
+ .collect(Collectors.toSet());
+ return new PathPattern(rootDir, commonWatchDirWhiteList, offset);
+ }).collect(Collectors.toSet());
+ }
+
+ /**
+ * cleanup local cache, subDirs is only used to filter duplicated
directories
+ * in one term watch key check.
+ */
+ public void cleanup() {
+ subDirs.clear();
+ }
+
+ /**
+ * Research all children files with {@link PathPattern#rootDir} matched
whiteList and filtered by blackList.
+ *
+ * @param maxNum
+ * @return
+ */
+ public Collection<File> walkSuitableFiles(int maxNum) {
+ Collection<File> suitableFiles = new ArrayList<>();
+ walkSuitableFiles(suitableFiles, new File(rootDir), maxNum);
+ return suitableFiles;
+ }
+
+ private void walkSuitableFiles(Collection<File> suitableFiles, File file,
int maxNum) {
+ if (suitableFiles.size() > maxNum) {
+ LOGGER.warn("Suitable files exceed max num {}, just return.",
maxNum);
+ return;
+ }
+
+ if (suitable(file.getAbsolutePath())) {
+ if (file.isFile()) {
+ suitableFiles.add(file);
+ } else if (file.isDirectory()) {
+ Stream.of(file.listFiles()).forEach(subFile ->
walkSuitableFiles(suitableFiles, subFile, maxNum));
+ }
+ }
+ }
+
+ /**
+ * Check whether path is suitable for match whiteList and filtered by
blackList
+ *
+ * @param path pathString
+ * @return true if suit else false.
+ */
+ public boolean suitable(String path) {
+ // remove common root path
+ String briefSubDir = StringUtils.substringAfter(path, rootDir);
+ // if already watched, then stop deep find
+ if (subDirs.contains(briefSubDir)) {
+ LOGGER.info("already watched {}", path);
+ return false;
+ }
+
+ subDirs.add(briefSubDir);
+ File file = new File(path);
+ return whiteList.stream()
+ .filter(whiteRegex -> whiteRegex.match(file))
+ .findAny()
+ .isPresent();
+ }
+
+ /**
+ * when a new file is found, update regex since time may change.
+ */
+ public void updateDateFormatRegex() {
+ whiteList.forEach(DateFormatRegex::setRegexWithCurrentTime);
+ }
+
+ /**
+ * when job is retry job, the time for searching file should be specified.
+ */
+ public void updateDateFormatRegex(String time) {
+ whiteList.forEach(whiteRegex -> whiteRegex.setRegexWithTime(time));
+ }
+
+ @Override
+ public String toString() {
+ return rootDir;
+ }
+
+ @Override
+ public int hashCode() {
+ return HashCodeBuilder.reflectionHashCode(rootDir, false);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof PathPattern) {
+ PathPattern entity = (PathPattern) object;
+ return entity.rootDir.equals(this.rootDir);
+ } else {
+ return false;
+ }
+ }
+
+ public String getRootDir() {
+ return rootDir;
+ }
+
+ public String getSuitTime() {
+ // todo: Adapt to datetime in the case of multiple regex
+ return whiteList.stream().findAny().get().getFormattedTime();
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
new file mode 100644
index 0000000000..eff927736d
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.filecollect;
+
+public class AgentErrMsg {
+
+ public static final String CONFIG_SUCCESS = "SUCCESS";
+
+ // 数据源配置异常 */
+ public static final String DATA_SOURCE_CONFIG_ERROR =
"ERROR-0-TDAgent|10001|ERROR"
+ + "|ERROR_DATA_SOURCE_CONFIG|";
+
+ // 监控文件夹不存在 */
+ public static final String DIRECTORY_NOT_FOUND_ERROR =
"ERROR-0-TDAgent|11001|WARN"
+ + "|WARN_DIRECTORY_NOT_EXIST|";
+
+ // 监控文件夹时出错 */
+ public static final String WATCH_DIR_ERROR = "ERROR-0-TDAgent|11002|ERROR"
+ + "|ERROR_WATCH_DIR_ERROR|";
+
+ // 要读取的文件异常(不存在,rotate)
+ public static final String FILE_ERROR =
"ERROR-0-TDAgent|10002|ERROR|ERROR_SOURCE_FILE|";
+
+ // 读取文件异常
+ public static final String FILE_OP_ERROR =
"ERROR-1-TDAgent|30002|ERROR|ERROR_OPERATE_FILE|";
+
+ // 磁盘满
+ public static final String DISK_FULL =
"ERROR-1-TDAgent|30001|FATAL|FATAL_DISK_FULL|";
+
+ // 内存溢出
+ public static final String OOM_ERROR =
"ERROR-1-TDAgent|30001|FATAL|FATAL_OOM_ERROR|";
+
+ // watcher异常
+ public static final String WATCHER_INVALID =
"ERROR-1-TDAgent|40001|WARN|WARN_INVALID_WATCHER|";
+
+ // 连不上tdmanager
+ public static final String CONNECT_TDM_ERROR =
"ERROR-1-TDAgent|30002|ERROR"
+ + "|ERROR_CANNOT_CONNECT_TO_TDM|";
+
+ // 发送数据到tdbus失败
+ public static final String SEND_TO_BUS_ERROR =
"ERROR-1-TDAgent|30003|ERROR|ERROR_SEND_TO_BUS|";
+
+ // 操作bdb异常
+ public static final String BDB_ERROR =
"ERROR-1-TDAgent|30003|ERROR|BDB_OPERATION_ERROR|";
+
+ // 内部缓存满
+ public static final String MSG_BUFFER_FULL =
"ERROR-1-TDAgent|40002|WARN|WARN_MSG_BUFFER_FULL|";
+
+ // 监控到的事件不合法(任务已删除)
+ public static final String FOUND_EVENT_INVALID =
"ERROR-1-TDAgent|30003|ERROR"
+ + "|FOUND_EVENT_INVALID|";
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
new file mode 100644
index 0000000000..46896d6cdc
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.filecollect;
+
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
+import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
+import org.apache.inlong.agent.plugin.utils.file.Files;
+import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/*
+ * This class is mainly used for scanning log file that we want to read. We
use this class at
+ * tdagent recover process, the do and redo tasks and the current log file
access when we deploy a
+ * new data source.
+ */
+public class FileScanner {
+
+ public static class BasicFileInfo {
+
+ public String fileName;
+ public String dataTime;
+
+ public BasicFileInfo(String fileName, String dataTime) {
+ this.fileName = fileName;
+ this.dataTime = dataTime;
+ }
+
+ }
+
+ private static final Logger logger =
LoggerFactory.getLogger(FileScanner.class);
+
+ public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf,
String originPattern, long failTime,
+ long recoverTime, boolean isRetry) {
+ String cycleUnit = conf.getCycleUnit();
+ if (!isRetry) {
+ failTime -= NewDateUtils.caclOffset(conf.getTimeOffset());
+ recoverTime -= NewDateUtils.caclOffset(conf.getTimeOffset());
+ }
+
+ String startTime = NewDateUtils.millSecConvertToTimeStr(failTime,
cycleUnit);
+ String endTime = NewDateUtils.millSecConvertToTimeStr(recoverTime,
cycleUnit);
+ logger.info("task {} this scan time is between {} and {}.",
+ new Object[]{conf.getTaskId(), startTime, endTime});
+
+ return scanTaskBetweenTimes(conf, originPattern, startTime, endTime);
+ }
+
+ /* Scan log files and create tasks between two times. */
+ public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf,
String originPattern, String startTime,
+ String endTime) {
+ String cycleUnit = conf.getCycleUnit();
+ int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM);
+ List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime,
cycleUnit);
+ List<BasicFileInfo> infos = new ArrayList<BasicFileInfo>();
+ for (Long time : dateRegion) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeInMillis(time);
+ String filename = NewDateUtils.replaceDateExpression(calendar,
originPattern);
+ ArrayList<String> allPaths = FilePathUtil.cutDirectory(filename);
+ String firstDir = allPaths.get(0);
+ String secondDir = allPaths.get(0) + File.separator +
allPaths.get(1);
+ ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir,
secondDir, filename, 3,
+ maxFileNum);
+ for (String file : fileList) {
+ // TODO the time is not YYYYMMDDHH
+ String dataTime = NewDateUtils.millSecConvertToTimeStr(time,
cycleUnit);
+ BasicFileInfo info = new BasicFileInfo(file, dataTime);
+ logger.info("scan new task fileName {} ,dataTime {}", file,
+ NewDateUtils.millSecConvertToTimeStr(time, cycleUnit));
+ infos.add(info);
+ }
+ }
+ return infos;
+ }
+
+ public static ArrayList<String> scanFile(TaskProfile conf, String
originPattern, long dataTime) {
+ int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM);
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeInMillis(dataTime);
+
+ String filename = NewDateUtils.replaceDateExpression(calendar,
originPattern);
+ ArrayList<String> allPaths = FilePathUtil.cutDirectory(filename);
+ String firstDir = allPaths.get(0);
+ String secondDir = allPaths.get(0) + File.separator + allPaths.get(1);
+ return getUpdatedOrNewFiles(firstDir, secondDir, filename, 3,
maxFileNum);
+ }
+
+ private static ArrayList<String> getUpdatedOrNewFiles(String firstDir,
String secondDir,
+ String fileName, long depth, int maxFileNum) {
+
+ // logger.info("getUpdatedOrNewFiles: firstdir: {}, seconddir: {}
filename: {}",
+ // new Object[]{firstDir, secondDir, fileName});
+
+ ArrayList<String> ret = new ArrayList<String>();
+ ArrayList<File> readyFiles = new ArrayList<File>();
+
+ if (!new File(firstDir).isDirectory()) {
+ return ret;
+ }
+
+ for (File pathname : Files.find(firstDir).yieldFilesAndDirectories()
+ .recursive().withDepth((int) depth).withDirNameRegex(secondDir)
+ .withFileNameRegex(fileName)) {
+ if (readyFiles.size() >= maxFileNum) {
+ break;
+ }
+ readyFiles.add(pathname);
+ }
+ // sort by last-modified time (older -> newer)
+ Collections.sort(readyFiles, new FileTimeComparator());
+ for (File f : readyFiles) {
+ // System.out.println(f.getAbsolutePath());
+ ret.add(f.getAbsolutePath());
+ }
+ return ret;
+ }
+
+ @SuppressWarnings("unused")
+ private static ArrayList<String> getUpdatedOrNewFiles(String logFileName,
+ int maxFileNum) {
+ ArrayList<String> ret = new ArrayList<String>();
+ ArrayList<String> directories = FilePathUtil
+ .getDirectoryLayers(logFileName);
+ String parentDir = directories.get(0) + File.separator
+ + directories.get(1);
+
+ Pattern pattern = Pattern.compile(directories.get(2),
+ Pattern.CASE_INSENSITIVE);
+ for (File file : new File(parentDir).listFiles()) {
+ Matcher matcher = pattern.matcher(file.getName());
+ if (matcher.matches() && ret.size() < maxFileNum) {
+ ret.add(file.getAbsolutePath());
+ }
+ }
+ return ret;
+ }
+
+ public static void main(String[] args) {
+
+ ArrayList<String> fileList = FileScanner.getUpdatedOrNewFiles(
+ "f:\\\\abc", "f:\\\\abc\\\\", "f:\\\\abc\\\\1.txt", 3, 100);
+ // fileList = FileScanner.getUpdatedOrNewFiles("F:\\abc\\1.txt", 100);
+ for (String fileName : fileList) {
+ System.out.println(fileName);
+ }
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
new file mode 100644
index 0000000000..bde4a33361
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.filecollect;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.core.task.TaskAction;
+import org.apache.inlong.agent.core.task.file.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.plugin.file.Task;
+import
org.apache.inlong.agent.plugin.task.filecollect.FileScanner.BasicFileInfo;
+import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
+import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.file.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchEvent.Kind;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Watch directory, if new valid files are created, create jobs
correspondingly.
+ */
+public class LogFileCollectTask extends Task {
+
+ public static final String DEFAULT_FILE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.FileInstance";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LogFileCollectTask.class);
+ private TaskProfile taskProfile;
+ private Db basicDb;
+ private TaskManager taskManager;
+ private InstanceManager instanceManager;
+ private final Map<String, WatchEntity> watchers = new
ConcurrentHashMap<>();
+ private final Set<String> watchFailedDirs = new HashSet<>();
+ private final Map<String/* dataTime */, Map<String/* fileName */,
InstanceProfile>> eventMap =
+ new ConcurrentHashMap<>();
+ public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
+ public static final int CORE_THREAD_SLEEP_TIME = 1000;
+ private boolean retry;
+ private long startTime;
+ private long endTime;
+ private boolean initOK = false;
+ private Set<String> originPatterns;
+ private long lastScanTime = 0;
+ public final long SCAN_INTERVAL = 1 * 60 * 1000;
+ private volatile boolean runAtLeastOneTime = false;
+ private volatile boolean running = false;
+
+ @Override
+ public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
+ if (!isProfileValid(taskProfile)) {
+ LOGGER.error("task profile invalid {}", taskProfile);
+ return;
+ }
+ taskManager = (TaskManager) srcManager;
+ commonInit(taskProfile, basicDb);
+ if (retry) {
+ retryInit();
+ } else {
+ watchInit();
+ }
+ initOK = true;
+ }
+
+ private void commonInit(TaskProfile taskProfile, Db basicDb) {
+ this.taskProfile = taskProfile;
+ this.basicDb = basicDb;
+ retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
+ originPatterns =
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
+ .collect(Collectors.toSet());
+ instanceManager = new InstanceManager(taskProfile.getTaskId(),
basicDb);
+ try {
+ instanceManager.start();
+ } catch (Exception e) {
+ LOGGER.error("start instance manager error {}", e.getMessage());
+ }
+ }
+
+ private boolean isProfileValid(TaskProfile profile) {
+ if (!profile.allRequiredKeyExist()) {
+ LOGGER.error("task profile needs all required key");
+ return false;
+ }
+ boolean ret =
+ profile.hasKey(TaskConstants.FILE_DIR_FILTER_PATTERNS)
+ && profile.hasKey(TaskConstants.TASK_FILE_TIME_OFFSET)
+ && profile.hasKey(TaskConstants.FILE_MAX_NUM);
+ if (!ret) {
+ LOGGER.error("task profile needs file keys");
+ return false;
+ }
+ if (profile.getBoolean(TaskConstants.TASK_RETRY, false)) {
+ long startTime = profile.getLong(TaskConstants.TASK_START_TIME, 0);
+ long endTime = profile.getLong(TaskConstants.TASK_END_TIME, 0);
+ if (startTime == 0 || endTime == 0) {
+ LOGGER.error("retry task time error start {} end {}",
startTime, endTime);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void retryInit() {
+ startTime = taskProfile.getLong(TaskConstants.TASK_START_TIME, 0);
+ endTime = taskProfile.getLong(TaskConstants.TASK_END_TIME, 0);
+ }
+
+ private void watchInit() {
+ originPatterns.forEach((pathPattern) -> {
+ addPathPattern(pathPattern);
+ });
+ }
+
+ public void addPathPattern(String originPattern) {
+ ArrayList<String> directories =
FilePathUtil.getDirectoryLayers(originPattern);
+ String basicStaticPath = directories.get(0);
+ LOGGER.info("dataName {} watchPath {}", new Object[]{originPattern,
basicStaticPath});
+ /* Remember the failed watcher creations. */
+ if (!new File(basicStaticPath).exists()) {
+ LOGGER.warn(AgentErrMsg.DIRECTORY_NOT_FOUND_ERROR +
basicStaticPath);
+ watchFailedDirs.add(originPattern);
+ return;
+ }
+ try {
+ /*
+ * When we construct the watch object, we should do some work with
the data name, replace yyyy to 4 digits
+ * regression, mm to 2 digits regression, also because of
difference between java regular expression and
+ * linux regular expression, we have to replace * to ., and
replace . with \\. .
+ */
+ WatchService watchService =
FileSystems.getDefault().newWatchService();
+ WatchEntity entity = new WatchEntity(watchService, originPattern,
taskProfile.getCycleUnit(),
+ taskProfile.getTimeOffset());
+ entity.registerRecursively();
+ watchers.put(originPattern, entity);
+ watchFailedDirs.remove(originPattern);
+ } catch (IOException e) {
+ if (e.toString().contains("Too many open files") ||
e.toString().contains("打开的文件过多")) {
+ LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString());
+ } else {
+ LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString(), e);
+ }
+ }
+ }
+
+ @Override
+ public void destroy() {
+ doChangeState(State.SUCCEEDED);
+ if (instanceManager != null) {
+ instanceManager.stop();
+ }
+ releaseWatchers(watchers);
+ }
+
+ private void releaseWatchers(Map<String, WatchEntity> watchers) {
+ while (running) {
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ }
+ watchers.forEach((taskId, watcher) -> {
+ try {
+ watcher.getWatchService().close();
+ } catch (IOException e) {
+ LOGGER.error("close watch service failed taskId {}", e,
taskId);
+ }
+ });
+ }
+
+ @Override
+ public TaskProfile getProfile() {
+ return taskProfile;
+ }
+
+ @Override
+ public String getTaskId() {
+ return taskProfile.getTaskId();
+ }
+
+ @Override
+ public void addCallbacks() {
+
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("directory-task-core-" + getTaskId());
+ running = true;
+ while (!isFinished()) {
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ if (!initOK) {
+ continue;
+ }
+ if (retry) {
+ runForRetry();
+ } else {
+ runForNormal();
+ }
+ }
+ running = false;
+ }
+
+ private void runForRetry() {
+ if (!runAtLeastOneTime) {
+ scanExistingFile();
+ dealWithEvenMap();
+ runAtLeastOneTime = true;
+ }
+ if (instanceManager.allInstanceFinished()) {
+ LOGGER.info("retry task finished, send action to task manager,
taskId {}", getTaskId());
+ TaskAction action = new
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
+ taskManager.submitAction(action);
+ doChangeState(State.SUCCEEDED);
+ }
+ }
+
+ private void runForNormal() {
+ if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) {
+ scanExistingFile();
+ lastScanTime = AgentUtils.getCurrentTime();
+ }
+ runForWatching();
+ dealWithEvenMap();
+ }
+
+ private void scanExistingFile() {
+ originPatterns.forEach((originPattern) -> {
+ List<BasicFileInfo> fileInfos =
scanExistingFileByPattern(originPattern);
+ LOGGER.debug("scan {} get file count {}", originPattern,
fileInfos.size());
+ fileInfos.forEach((fileInfo) -> {
+ addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
+ });
+ });
+ }
+
+ private List<BasicFileInfo> scanExistingFileByPattern(String
originPattern) {
+ long startScanTime = startTime;
+ long endScanTime = endTime;
+ if (!retry) {
+ long currentTime = System.currentTimeMillis();
+ // only scan two cycle, like two hours or two days
+ long offset = NewDateUtils.caclOffset("-2" +
taskProfile.getCycleUnit());
+ startScanTime = currentTime - offset;
+ endScanTime = currentTime;
+ }
+ return FileScanner.scanTaskBetweenTimes(taskProfile, originPattern,
startScanTime, endScanTime, retry);
+ }
+
+ private void runForWatching() {
+ /* Deal with those failed watcher creation tasks. */
+ Set<String> tmpWatchFailedDirs = new HashSet<>(watchFailedDirs);
+ for (String originPattern : tmpWatchFailedDirs) {
+ addPathPattern(originPattern);
+ }
+ /*
+ * Visit the watchers to see if it gets any new file creation, if it
exists and fits the file name pattern, add
+ * it to the task list.
+ */
+ for (Map.Entry<String, WatchEntity> entry : watchers.entrySet()) {
+ dealWithWatchEntity(entry.getKey());
+ }
+ }
+
+ private void dealWithEvenMap() {
+ removeTimeoutEven(eventMap, retry);
+ for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
+ Map<String, InstanceProfile> sameDataTimeEvents = entry.getValue();
+ // 根据event的数据时间、业务的周期、偏移量计算出该event是否需要在当前时间处理
+ String dataTime = entry.getKey();
+ String shouldStartTime =
+ NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), taskProfile.getTimeOffset());
+ String currentTime = getCurrentTime();
+ LOGGER.info("taskId {}, dataTime {}, currentTime {},
shouldStartTime {}",
+ new Object[]{getTaskId(), dataTime, currentTime,
shouldStartTime});
+ if (currentTime.compareTo(shouldStartTime) >= 0) {
+ /* These codes will sort the FileCreationEvents by create
time. */
+ Set<InstanceProfile> sortedEvents = new
TreeSet<>(sameDataTimeEvents.values());
+ /* Check the file end with event creation time in asc order. */
+ for (InstanceProfile sortEvent : sortedEvents) {
+ String fileName = sortEvent.getInstanceId();
+ InstanceProfile profile = sameDataTimeEvents.get(fileName);
+ InstanceAction action = new InstanceAction(ActionType.ADD,
profile);
+ while (!instanceManager.submitAction(action)) {
+ LOGGER.error("instance manager action queue is full:
taskId {}", instanceManager.getTaskId());
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ }
+ sameDataTimeEvents.remove(fileName);
+ }
+ }
+ }
+ }
+
+ private void removeTimeoutEven(Map<String, Map<String, InstanceProfile>>
eventMap, boolean isRetry) {
+ if (isRetry) {
+ return;
+ }
+ for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
+ // 如果event的数据时间在当前时间前(后)2天之内,则有效
+ String dataTime = entry.getKey();
+ if (!NewDateUtils.isValidCreationTime(dataTime,
DAY_TIMEOUT_INTERVAL)) {
+ /* Remove it from memory map. */
+ eventMap.remove(dataTime);
+ LOGGER.warn("remove too old event from event map. dataTime
{}", dataTime);
+ }
+ }
+ }
+
+ private String getCurrentTime() {
+ SimpleDateFormat dateFormat = new
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
+ TimeZone timeZone =
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
+ dateFormat.setTimeZone(timeZone);
+ return dateFormat.format(new Date(System.currentTimeMillis()));
+ }
+
+ public synchronized void dealWithWatchEntity(String originPattern) {
+ WatchEntity entity = watchers.get(originPattern);
+ if (entity == null) {
+ LOGGER.error("Can't find the watch entity for originPattern: " +
originPattern);
+ return;
+ }
+ try {
+ /* Get all creation events until all events are consumed. */
+ for (int i = 0; i < entity.getTotalPathSize(); i++) {
+ // maybe the watchService is closed ,but we catch this
exception!
+ final WatchKey key = entity.getWatchService().poll();
+ if (key == null) {
+ return;
+ }
+ dealWithWatchKey(entity, key);
+ }
+ } catch (Exception e) {
+ LOGGER.error("deal with creation event error: ", e);
+ }
+ }
+
+ private void dealWithWatchKey(WatchEntity entity, WatchKey key) throws
IOException {
+ Path contextPath = entity.getPath(key);
+ LOGGER.info("Find creation events in path: " +
contextPath.toAbsolutePath());
+ for (WatchEvent<?> watchEvent : key.pollEvents()) {
+ Path child = resolvePathFromEvent(watchEvent, contextPath);
+ if (child == null) {
+ continue;
+ }
+ if (Files.isDirectory(child)) {
+ LOGGER.warn("The find creation event is triggered by a
directory: " + child
+ .getFileName());
+ entity.registerRecursively(child);
+ continue;
+ }
+ handleFilePath(child, entity);
+ }
+ resetWatchKey(entity, key, contextPath);
+ }
+
+ private Path resolvePathFromEvent(WatchEvent<?> watchEvent, Path
contextPath) {
+ final Kind<?> kind = watchEvent.kind();
+ /*
+ * Can't simply continue when it detects that an event maybe ignored.
+ */
+ if (kind == StandardWatchEventKinds.OVERFLOW) {
+ LOGGER.error("An event is unclear and lost");
+ /*
+ * TODO: should we do a full scan to avoid lost events?
+ */
+ return null;
+ }
+ final WatchEvent<Path> watchEventPath = (WatchEvent<Path>) watchEvent;
+ final Path eventPath = watchEventPath.context();
+ /*
+ * Must resolve, otherwise we can't get the file attributes.
+ */
+ return contextPath.resolve(eventPath);
+ }
+
+ private void handleFilePath(Path filePath, WatchEntity entity) {
+ String newFileName = filePath.toFile().getAbsolutePath();
+ LOGGER.info("[New File] {} {}", newFileName,
entity.getOriginPattern());
+ Matcher matcher = entity.getPattern().matcher(newFileName);
+ if (matcher.matches() || matcher.lookingAt()) {
+ LOGGER.info("[Matched File] {} {}", newFileName,
entity.getOriginPattern());
+ String dataTime = getDataTimeFromFileName(newFileName,
entity.getOriginPattern(),
+ entity.getDateExpression());
+ if (!checkFileNameForTime(newFileName, entity)) {
+ LOGGER.error(AgentErrMsg.FILE_ERROR + "File Timeout {} {}",
newFileName, dataTime);
+ return;
+ }
+ addToEvenMap(newFileName, dataTime);
+ }
+ }
+
+ private void addToEvenMap(String fileName, String dataTime) {
+ Long lastModifyTime = FileUtils.getFileLastModifyTime(fileName);
+ if (!instanceManager.shouldAddAgain(fileName, lastModifyTime)) {
+ LOGGER.info("file {} has record in db", fileName);
+ return;
+ }
+ Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.computeIfAbsent(dataTime,
+ mapKey -> new ConcurrentHashMap<>());
+ boolean containsInMemory = sameDataTimeEvents.containsKey(fileName);
+ if (containsInMemory) {
+ LOGGER.error("should not happen! may be {} has been deleted and
add again", fileName);
+ return;
+ }
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE,
+ fileName, dataTime);
+ sameDataTimeEvents.put(fileName, instanceProfile);
+ }
+
+ private boolean checkFileNameForTime(String newFileName, WatchEntity
entity) {
+ /* Get the data time for this file. */
+ PathDateExpression dateExpression = entity.getDateExpression();
+ if (dateExpression.getLongestDatePattern().length() != 0) {
+ String dataTime = getDataTimeFromFileName(newFileName,
entity.getOriginPattern(), dateExpression);
+ LOGGER.info("file {} ,fileTime {}", newFileName, dataTime);
+ if (!NewDateUtils.isValidCreationTime(dataTime,
entity.getCycleUnit(),
+ entity.getTimeOffset())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private String getDataTimeFromFileName(String fileName, String
originPattern, PathDateExpression dateExpression) {
+ /*
+ * TODO: what if this file doesn't have any date pattern regex.
+ *
+ * For this case, we can simple think that the next file creation
means the last task of this conf should finish
+ * reading and start reading this new file.
+ */
+ // 从文件名称中提取数据时间
+ String fileTime = NewDateUtils.getDateTime(fileName, originPattern,
dateExpression);
+
+ /**
+ * 将文件时间中任意非数字字符替换掉
+ * 如2015-09-16_00替换成2015091600
+ */
+ return fileTime.replaceAll("\\D", "");
+ }
+
+ private void resetWatchKey(WatchEntity entity, WatchKey key, Path
contextPath) {
+ key.reset();
+ /*
+ * Register a new watch service on the path if the old watcher is
invalid.
+ */
+ if (!key.isValid()) {
+ LOGGER.warn(AgentErrMsg.WATCHER_INVALID + "Invalid Watcher {}",
+ contextPath.getFileName());
+ try {
+ WatchService oldService = entity.getWatchService();
+ oldService.close();
+ WatchService watchService =
FileSystems.getDefault().newWatchService();
+ entity.clearKeys();
+ entity.clearPathToKeys();
+ entity.setWatchService(watchService);
+ entity.registerRecursively();
+ } catch (IOException e) {
+ LOGGER.error("Restart a new watcher runs into error: ", e);
+ }
+ }
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/TaskType.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/TaskType.java
new file mode 100644
index 0000000000..9bda67b85a
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/TaskType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.filecollect;
+
+public enum TaskType {
+
+ READER(0),
+ TAILER(1),
+ UPLOADER(2),
+ STATE(3),
+ OTHER(4),
+ DB(5),
+ GAIAReader(6);
+
+ private int type;
+
+ TaskType(int type) {
+ this.type = type;
+ }
+
+ public int getType() {
+ return type;
+ }
+}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
new file mode 100644
index 0000000000..ffd74c0100
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.filecollect;
+
+import org.apache.inlong.agent.plugin.utils.file.DateUtils;
+import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
+import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition;
+import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class WatchEntity {
+
+ private static final Logger logger =
LoggerFactory.getLogger(WatchEntity.class);
+ private WatchService watchService;
+ private final String basicStaticPath;
+ private final String originPattern;
+ private final String regexPattern;
+ private final Pattern pattern;
+ private final PathDateExpression dateExpression;
+ private final String originPatternWithoutFileName;
+ private final Pattern patternWithoutFileName;
+ private final boolean containRegexPattern;
+ private final Map<WatchKey, Path> keys = new ConcurrentHashMap<WatchKey,
Path>();
+ private final Map<String, WatchKey> pathToKeys = new
ConcurrentHashMap<String, WatchKey>();
+ private final String dirSeparator = System.getProperty("file.separator");
+ private String cycleUnit;
+ private String timeOffset;
+
+ public WatchEntity(WatchService watchService,
+ String originPattern,
+ String cycleUnit,
+ String timeOffset) {
+ this.watchService = watchService;
+ this.originPattern = originPattern;
+ ArrayList<String> directoryLayers =
FilePathUtil.getDirectoryLayers(originPattern);
+ this.basicStaticPath = directoryLayers.get(0);
+ this.regexPattern =
NewDateUtils.replaceDateExpressionWithRegex(originPattern);
+ pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE |
Pattern.DOTALL | Pattern.MULTILINE);
+ ArrayList<String> directories =
FilePathUtil.cutDirectory(originPattern);
+ this.originPatternWithoutFileName = directories.get(0);
+ this.patternWithoutFileName = Pattern
+
.compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
+ Pattern.CASE_INSENSITIVE | Pattern.DOTALL |
Pattern.MULTILINE);
+ /*
+ * Get the longest data regex from the data name, it's used if we want
to get out the data time from the file
+ * name.
+ */
+ this.dateExpression =
DateUtils.extractLongestTimeRegexWithPrefixOrSuffix(originPattern);
+ this.containRegexPattern = isPathContainRegexPattern();
+ this.cycleUnit = cycleUnit;
+ this.timeOffset = timeOffset;
+ logger.info("add a new watchEntity {}", this);
+ }
+
+ @Override
+ public String toString() {
+ return "WatchEntity [parentPathName=" + basicStaticPath
+ + ", readFilePattern=" + regexPattern
+ + ", dateExpression=" + dateExpression + ", totalDirPattern="
+ + originPatternWithoutFileName + ", containRegexPattern="
+ + containRegexPattern + ", totalDirRegexPattern="
+ + patternWithoutFileName + ", keys=" + keys + ", pathToKeys="
+ pathToKeys
+ + ", watchService=" + watchService + "]";
+ }
+
+ private boolean isPathContainRegexPattern() {
+ if (originPatternWithoutFileName.contains("YYYY") ||
originPatternWithoutFileName.contains("MM")
+ || originPatternWithoutFileName.contains("DD") ||
originPatternWithoutFileName.contains("hh")) {
+ return true;
+ }
+
+ return false;
+ }
+
+ public boolean isContainRegexPattern() {
+ return containRegexPattern;
+ }
+
+ private int calcPathDepth(String rootDir, String dirName) {
+ // rootDir
+ return 0;
+ }
+
+ private void register(Path dir) throws IOException {
+
+ if (dir == null) {
+ return;
+ }
+
+ String dirName = dir.toAbsolutePath().toString();
+ logger.info(dirName);
+ Matcher matcher = patternWithoutFileName.matcher(dirName);
+ String rootDir =
Paths.get(basicStaticPath).toAbsolutePath().toString();
+ Paths.get(basicStaticPath).toAbsolutePath().getNameCount();
+
+ // must use suffeix match
+ // consider /data/YYYYMMDD/abc/YYYYMMDDhh.*.txt this case
+ if (!pathToKeys.containsKey(dirName) && (matcher.matches() ||
rootDir.equals(dirName))) {
+ WatchKey key = dir.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE);
+ keys.put(key, dir);
+ pathToKeys.put(dirName, key);
+
+ logger.info("Register a new directory: " +
dir.toAbsolutePath().toString());
+ }
+ }
+
+ public void registerRecursively() throws IOException {
+ // register root dir
+ Path rootPath = Paths.get(basicStaticPath);
+ String rootDirName = rootPath.toAbsolutePath().toString();
+ if (!pathToKeys.containsKey(rootDirName)) {
+ WatchKey key = rootPath.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE);
+ keys.put(key, rootPath);
+ pathToKeys.put(rootDirName, key);
+ logger.info("Register a new directory: " + rootDirName);
+ }
+ registerRecursively(rootPath.toFile(),
rootPath.toAbsolutePath().toString().length() + 1);
+ }
+
+ public void registerRecursively(Path dir) throws IOException {
+ Path rootPath = dir;
+ String rootDirName = rootPath.toAbsolutePath().toString();
+ int beginIndex = rootDirName.lastIndexOf(dirSeparator) + 1;
+ if (beginIndex == 0) {
+ return;
+ }
+ int index = originPatternWithoutFileName.indexOf(dirSeparator,
beginIndex + 1);
+ Pattern pattern = getPattern(index);
+ logger.info("beginIndex {} ,index {} ,dirPattern {}",
+ new Object[]{beginIndex, index, pattern.pattern()});
+ if (!pathToKeys.containsKey(rootDirName) && match(pattern,
rootDirName)) {
+ WatchKey key = rootPath.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE);
+ keys.put(key, rootPath);
+ pathToKeys.put(rootDirName, key);
+ logger.info("Register a new directory: " + rootDirName);
+ } else {
+ return;
+ }
+
+ logger.info("rootPath len {}",
rootPath.toAbsolutePath().toString().length());
+
+ registerRecursively(rootPath.toFile(),
rootPath.toAbsolutePath().toString().length() + 1);
+ }
+
+ public void registerRecursively(File dir, int beginIndex) throws
IOException {
+ File[] files = dir.listFiles();
+ if (files == null) {
+ return;
+ }
+ int index = originPatternWithoutFileName.indexOf(dirSeparator,
beginIndex);
+ Pattern pattern = getPattern(index);
+ logger.info("beginIndex {} ,index {} ,dirPattern {}",
+ new Object[]{beginIndex, index, pattern.pattern()});
+ for (int i = 0; i < files.length; i++) {
+ if (files[i].isDirectory()) {
+ String dirName = files[i].toString();
+ Path dirPath = Paths.get(dirName);
+ if (!pathToKeys.containsKey(dirName) && match(pattern,
dirName)) {
+ try {
+ WatchKey key = dirPath
+ .register(watchService,
StandardWatchEventKinds.ENTRY_CREATE);
+ keys.put(key, dirPath);
+ pathToKeys.put(dirName, key);
+ logger.info("Register a new directory: " + dirName);
+ } catch (IOException e) {
+ /**
+ * 捕获异常,不能注册的子目录就忽略。
+ */
+ logger.error("Register directory {} error, skip it. ",
dirName, e);
+ continue;
+ }
+ registerRecursively(files[i].getAbsoluteFile(),
+ files[i].getAbsolutePath().length() + 1);
+ }
+ }
+ }
+ }
+
+ private Pattern getPattern(int index) {
+ String dirPattern = "";
+ if (index == -1) {
+ dirPattern = originPatternWithoutFileName;
+ } else {
+ dirPattern = originPatternWithoutFileName.substring(0, index);
+ }
+ Pattern pattern =
Pattern.compile(NewDateUtils.replaceDateExpressionWithRegex(dirPattern),
+ Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
+ return pattern;
+ }
+
+ private boolean match(Pattern pattern, String dirName) {
+ Matcher matcher = pattern.matcher(dirName);
+ return matcher.matches() || matcher.lookingAt();
+ }
+
+ public Path getPath(WatchKey key) {
+ return keys.get(key);
+ }
+
+ public int getTotalPathSize() {
+ return keys.size();
+ }
+
+ public String getWatchPath() {
+ return basicStaticPath;
+ }
+
+ public WatchService getWatchService() {
+ return watchService;
+ }
+
+ public void setWatchService(WatchService watchService) {
+ this.watchService = watchService;
+ }
+
+ public String getRegexPattern() {
+ return regexPattern;
+ }
+
+ public PathDateExpression getDateExpression() {
+ return dateExpression;
+ }
+
+ public String getLongestDatePattern() {
+ return dateExpression.getLongestDatePattern();
+ }
+
+ public NonRegexPatternPosition getPatternPosition() {
+ return dateExpression.getPatternPosition();
+ }
+
+ /*
+ * Remove the watched path which is 3 cycle units earlier than current
task data time, this is because JDK7 starts a
+ * thread for each watch path, which should consume lots of memory.
+ */
+ public void removeUselessWatchDirectories(String curDataTime)
+ throws Exception {
+
+ logger.info("removeUselessWatchDirectories {}", curDataTime);
+
+ /* Calculate the data time which is 3 cycle units earlier than current
task data time. */
+ long curDataTimeMillis =
NewDateUtils.timeStrConvertTomillSec(curDataTime, cycleUnit);
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeInMillis(curDataTimeMillis);
+ if ("D".equalsIgnoreCase(cycleUnit)) {
+ calendar.add(Calendar.DAY_OF_YEAR, -3);
+ } else if ("h".equalsIgnoreCase(cycleUnit)) {
+ calendar.add(Calendar.HOUR_OF_DAY, -3);
+ } else if ("10m".equalsIgnoreCase(cycleUnit)) {
+ calendar.add(Calendar.MINUTE, -30);
+ }
+
+ /* Calculate the 3 cycle units earlier date. */
+ String year = String.valueOf(calendar.get(Calendar.YEAR));
+ String month = String.valueOf(calendar.get(Calendar.MONTH) + 1);
+ if (month.length() < 2) {
+ month = "0" + month;
+ }
+ String day = String.valueOf(calendar.get(Calendar.DAY_OF_MONTH));
+ if (day.length() < 2) {
+ day = "0" + day;
+ }
+ String hour = String.valueOf(calendar.get(Calendar.HOUR_OF_DAY));
+ if (hour.length() < 2) {
+ hour = "0" + hour;
+ }
+ String minute = String.valueOf(calendar.get(Calendar.MINUTE));
+ if (minute.length() < 2) {
+ minute = "0" + minute;
+ }
+
+ /* Replace it with the date and get a specified watch path. */
+ String copyDirPattern = new String(originPatternWithoutFileName);
+ copyDirPattern = copyDirPattern.replace("YYYY", year);
+ copyDirPattern = copyDirPattern.replace("MM", month);
+ copyDirPattern = copyDirPattern.replace("DD", day);
+ copyDirPattern = copyDirPattern.replace("hh", hour);
+ copyDirPattern = copyDirPattern.replace("mm", minute);
+
+ Set<String> keys = pathToKeys.keySet();
+ Set<String> tmpKeys = new HashSet<>();
+ tmpKeys.addAll(keys);
+ String rootDir =
Paths.get(basicStaticPath).toAbsolutePath().toString();
+ for (String path : tmpKeys) {
+ /*
+ * Remove the watch path whose path is smaller than the 3 cycle
units earlier.
+ */
+ logger.info("[Path]{} {}", path, copyDirPattern);
+ if (path.compareTo(copyDirPattern) < 0 &&
!copyDirPattern.contains(path)) {
+ WatchKey key = pathToKeys.get(path);
+ key.cancel();
+
+ pathToKeys.remove(path);
+
+ logger.info("Watch path: {} is too old for data time: {}, we
should remove", path,
+ curDataTime);
+ }
+ }
+ }
+
+ public void clearPathToKeys() {
+ pathToKeys.clear();
+ }
+
+ public void clearKeys() {
+ keys.clear();
+ }
+
+ public String getCycleUnit() {
+ return cycleUnit;
+ }
+
+ public String getTimeOffset() {
+ return timeOffset;
+ }
+
+ public String getOriginPattern() {
+ return originPattern;
+ }
+
+ public Pattern getPattern() {
+ return pattern;
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 610dbc7574..7850678acd 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -18,13 +18,18 @@
package org.apache.inlong.agent.plugin;
import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -34,7 +39,8 @@ import java.nio.file.Paths;
public class AgentBaseTestsHelper {
private static final Logger LOGGER =
LoggerFactory.getLogger(AgentBaseTestsHelper.class);
-
+ private static final GsonBuilder gsonBuilder = new
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
+ private static final Gson GSON = gsonBuilder.create();
private final String className;
private Path testRootDir;
private Path parentPath;
@@ -45,8 +51,8 @@ public class AgentBaseTestsHelper {
public AgentBaseTestsHelper setupAgentHome() {
parentPath = Paths.get("./").toAbsolutePath();
- testRootDir = Paths.get(parentPath + File.separator + "logs",
- AgentBaseTestsHelper.class.getSimpleName(), className);
+ testRootDir = Paths
+ .get("/tmp", AgentBaseTestsHelper.class.getSimpleName(),
className);
teardownAgentHome();
boolean result = testRootDir.toFile().mkdirs();
LOGGER.info("try to create {}, result is {}", testRootDir, result);
@@ -54,14 +60,14 @@ public class AgentBaseTestsHelper {
return this;
}
- public Path getTestRootDir() {
- return testRootDir.toAbsolutePath();
- }
-
public Path getParentPath() {
return parentPath;
}
+ public Path getTestRootDir() {
+ return testRootDir;
+ }
+
public void teardownAgentHome() {
if (testRootDir != null) {
try {
@@ -71,4 +77,32 @@ public class AgentBaseTestsHelper {
}
}
}
+
+ public TaskProfile getTaskProfile(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
+ TaskStateEnum state) {
+ DataConfig dataConfig = getDataConfig(taskId, pattern, retry,
startTime, endTime, state);
+ TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
+ return profile;
+ }
+
+ private DataConfig getDataConfig(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
+ TaskStateEnum state) {
+ DataConfig dataConfig = new DataConfig();
+ dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId
+ dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId
+ dataConfig.setDataReportType(1); // 老字段 reportType
+ dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集
+ dataConfig.setTaskId(taskId); // 老字段 任务 id
+ dataConfig.setState(state.ordinal()); // 新增! 任务状态 1 正常 2 暂停
+ FileTaskConfig fileTaskConfig = new FileTaskConfig();
+ fileTaskConfig.setPattern(pattern);// 正则
+ fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2
小时前的
+ fileTaskConfig.setMaxFileCount(100); // 最大文件数
+ fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时
+ fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true
+ fileTaskConfig.setStartTime(startTime);
+ fileTaskConfig.setEndTime(endTime);
+ dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
+ return dataConfig;
+ }
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockInstanceManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockInstanceManager.java
new file mode 100644
index 0000000000..a989f3bfaf
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockInstanceManager.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+public class MockInstanceManager {
+
+ public void stop() {
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
new file mode 100644
index 0000000000..8a9ed4c021
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.file.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
+import org.apache.inlong.common.enums.TaskStateEnum;
+
+import com.google.gson.Gson;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(LogFileCollectTask.class)
+@PowerMockIgnore({"javax.management.*"})
+public class TestLogfileCollectTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestLogfileCollectTask.class);
+ private static final ClassLoader LOADER =
TestLogfileCollectTask.class.getClassLoader();
+ private static LogFileCollectTask task;
+ private static AgentBaseTestsHelper helper;
+ private static final Gson GSON = new Gson();
+ private static TaskManager manager;
+ private static MockInstanceManager instanceManager = new
MockInstanceManager();
+ private static String resourceName;
+ private static String fileName;
+ private static String dataTime;
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("TestLogfileCollectTask"));
+
+ @BeforeClass
+ public static void setup() {
+ helper = new
AgentBaseTestsHelper(TestLogfileCollectTask.class.getName()).setupAgentHome();
+ Db basicDb = TaskManager.initDb("/localdb");
+ resourceName = LOADER.getResource("test/20230928_1.txt").getPath();
+ File f = new File(resourceName);
+ String pattern = f.getParent() + "/YYYYMMDD_[0-9]+.txt";
+ TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L,
0L, TaskStateEnum.RUNNING);
+ try {
+ String startStr = "2023-09-20 00:00:00";
+ String endStr = "2023-09-30 00:00:00";
+ Date parse = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").parse(startStr);
+ long start = parse.getTime();
+ parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endStr);
+ long end = parse.getTime();
+ taskProfile.setLong(TaskConstants.TASK_START_TIME, start);
+ taskProfile.setLong(TaskConstants.TASK_END_TIME, end);
+ manager = new TaskManager();
+ task = PowerMockito.spy(new LogFileCollectTask());
+ PowerMockito.doAnswer(invocation -> {
+ fileName = invocation.getArgument(0);
+ dataTime = invocation.getArgument(1);
+ return null;
+ }).when(task, "addToEvenMap", Mockito.anyString(),
Mockito.anyString());
+ task.init(manager, taskProfile, basicDb);
+ EXECUTOR_SERVICE.submit(task);
+ } catch (Exception e) {
+ LOGGER.error("source init error {}", e);
+ Assert.assertTrue("source init error", false);
+ }
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ task.destroy();
+ helper.teardownAgentHome();
+ }
+
+ @Test
+ public void testTaskManager() throws Exception {
+ await().atMost(2, TimeUnit.SECONDS).until(() -> fileName != null &&
dataTime != null);
+ Assert.assertTrue(fileName.compareTo(resourceName) == 0);
+ Assert.assertTrue(dataTime.compareTo("20230928") == 0);
+ PowerMockito.verifyPrivate(task, Mockito.times(1))
+ .invoke("addToEvenMap", Mockito.anyString(),
Mockito.anyString());
+ }
+}
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/resources/test/20230928_1.txt
b/inlong-agent/agent-plugins/src/test/resources/test/20230928_1.txt
new file mode 100644
index 0000000000..780b09709f
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/test/20230928_1.txt
@@ -0,0 +1,3 @@
+hello line-end-symbol aa
+world line-end-symbol
+agent line-end-symbol