This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7954b7f5c6 [INLONG-9200][Agent] Fix bug: duplicate file collect
instance (#9201)
7954b7f5c6 is described below
commit 7954b7f5c642762b84f71e610e2d9be463a9a4e9
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Nov 3 15:05:48 2023 +0800
[INLONG-9200][Agent] Fix bug: duplicate file collect instance (#9201)
* [INLONG-9200][Agent] Fix bug: duplicate file collect instance
* [INLONG-9200][Agent] Fix bug: duplicate file collect instance
* [INLONG-9200][Agent] Fix bug: duplicate file collect instance
---
.../apache/inlong/agent/conf/InstanceProfile.java | 10 ++++-
.../org/apache/inlong/agent/conf/TaskProfile.java | 4 +-
.../inlong/agent/constant/TaskConstants.java | 2 +
.../org/apache/inlong/agent/db/InstanceDb.java | 5 +--
.../org/apache/inlong/agent/db/TaskProfileDb.java | 5 +--
.../agent/core/instance/InstanceManager.java | 15 ++++++--
.../inlong/agent/core/task/file/MemoryManager.java | 24 ++++++++----
.../inlong/agent/core/task/file/TaskManager.java | 4 +-
.../agent/core/instance/TestInstanceManager.java | 7 +++-
.../agent/plugin/sinks/filecollect/ProxySink.java | 3 +-
.../inlong/agent/plugin/sources/LogFileSource.java | 3 +-
.../task/filecollect/LogFileCollectTask.java | 44 ++++++++++++++++------
.../sinks/filecollect/TestSenderManager.java | 2 +-
.../agent/plugin/sources/TestLogFileSource.java | 3 +-
.../agent/plugin/sources/TestMqttConnect.java | 3 +-
15 files changed, 91 insertions(+), 43 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index 024b7674eb..5592008085 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -93,9 +93,17 @@ public class InstanceProfile extends AbstractConfiguration
implements Comparable
setInt(INSTANCE_STATE, state.ordinal());
}
+ public long getFileUpdateTime() {
+ return getLong(TaskConstants.FILE_UPDATE_TIME, 0);
+ }
+
+ public void setFileUpdateTime(long lastUpdateTime) {
+ setLong(TaskConstants.FILE_UPDATE_TIME, lastUpdateTime);
+ }
+
@Override
public boolean allRequiredKeyExist() {
- return true;
+ return hasKey(TaskConstants.FILE_UPDATE_TIME);
}
/**
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index 1040afa88a..319f1abf56 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -106,7 +106,8 @@ public class TaskProfile extends AbstractConfiguration {
return GSON.toJson(getConfigStorage());
}
- public InstanceProfile createInstanceProfile(String instanceClass, String
fileName, String dataTime) {
+ public InstanceProfile createInstanceProfile(String instanceClass, String
fileName, String dataTime,
+ long fileUpdateTime) {
InstanceProfile instanceProfile =
InstanceProfile.parseJsonStr(toJsonStr());
instanceProfile.setInstanceClass(instanceClass);
instanceProfile.setInstanceId(fileName);
@@ -114,6 +115,7 @@ public class TaskProfile extends AbstractConfiguration {
instanceProfile.setCreateTime(AgentUtils.getCurrentTime());
instanceProfile.setModifyTime(AgentUtils.getCurrentTime());
instanceProfile.setState(InstanceStateEnum.DEFAULT);
+ instanceProfile.setFileUpdateTime(fileUpdateTime);
return instanceProfile;
}
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index bbf95cea76..fa2ac856fd 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -166,6 +166,8 @@ public class TaskConstants extends CommonConstants {
public static final String INSTANCE_STATE = "instance.state";
+ public static final String FILE_UPDATE_TIME = "fileUpdateTime";
+
public static final String LAST_UPDATE_TIME = "lastUpdateTime";
public static final String TRIGGER_ONLY_ONE_JOB = "job.standalone"; //
TODO:delete it
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
index dfc8a17055..db41243fbf 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
@@ -65,10 +65,7 @@ public class InstanceDb {
instance.get(TaskConstants.INSTANCE_ID));
KeyValueEntity entity = new KeyValueEntity(keyName,
instance.toJsonStr(),
instance.get(TaskConstants.INSTANCE_ID));
- KeyValueEntity oldEntity = db.put(entity);
- if (oldEntity != null) {
- LOGGER.warn("instance profile {} has been replaced",
oldEntity.getKey());
- }
+ db.put(entity);
} else {
LOGGER.error("instance profile invalid!");
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
index b524bb09db..d37c3f9b10 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
@@ -64,10 +64,7 @@ public class TaskProfileDb {
String keyName = getKeyByTaskId(task.getTaskId());
KeyValueEntity entity = new KeyValueEntity(keyName,
task.toJsonStr(),
task.get(TaskConstants.FILE_DIR_FILTER_PATTERNS));
- KeyValueEntity oldEntity = db.put(entity);
- if (oldEntity != null) {
- LOGGER.warn("task profile {} has been replaced",
oldEntity.getKey());
- }
+ db.put(entity);
}
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index a6c8381de3..e209535db8 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
public class InstanceManager extends AbstractDaemon {
private static final Logger LOGGER =
LoggerFactory.getLogger(InstanceManager.class);
- private static final int ACTION_QUEUE_CAPACITY = 100000;
+ private static final int ACTION_QUEUE_CAPACITY = 100;
public static final int CORE_THREAD_SLEEP_TIME = 100;
// task in db
private final InstanceDb instanceDb;
@@ -236,7 +236,11 @@ public class InstanceManager extends AbstractDaemon {
}
private void addInstance(InstanceProfile profile) {
- LOGGER.info("addInstance taskId {} instanceId {}", taskId,
profile.getInstanceId());
+ LOGGER.info("add instance taskId {} instanceId {}", taskId,
profile.getInstanceId());
+ if (!shouldAddAgain(profile.getInstanceId(),
profile.getFileUpdateTime())) {
+ LOGGER.info("shouldAddAgain returns false skip taskId {}
instanceId {}", taskId, profile.getInstanceId());
+ return;
+ }
addToDb(profile);
addToMemory(profile);
}
@@ -274,7 +278,7 @@ public class InstanceManager extends AbstractDaemon {
}
private void addToDb(InstanceProfile profile) {
- LOGGER.info("add instance to db instanceId {} ",
profile.getInstanceId());
+ LOGGER.info("add instance to db state {} instanceId {}",
profile.getState(), profile.getInstanceId());
instanceDb.storeInstance(profile);
}
@@ -287,7 +291,7 @@ public class InstanceManager extends AbstractDaemon {
oldInstance.destroy();
instanceMap.remove(instanceProfile.getInstanceId());
LOGGER.error("old instance {} should not exist, try stop it first",
- instanceProfile);
+ instanceProfile.getInstanceId());
}
LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
try {
@@ -315,13 +319,16 @@ public class InstanceManager extends AbstractDaemon {
public boolean shouldAddAgain(String fileName, long lastModifyTime) {
InstanceProfile profileFromDb = instanceDb.getInstance(taskId,
fileName);
if (profileFromDb == null) {
+ LOGGER.debug("not in db should add {}", fileName);
return true;
} else {
InstanceStateEnum state = profileFromDb.getState();
if (state == InstanceStateEnum.FINISHED && lastModifyTime >
profileFromDb.getModifyTime()) {
+ LOGGER.debug("finished but file update again {}", fileName);
return true;
}
if (state == InstanceStateEnum.DELETE) {
+ LOGGER.debug("delete and add again {}", fileName);
return true;
}
return false;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
index fca9d37c72..a3bfc415e2 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.core.task.file;
import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +42,8 @@ public class MemoryManager {
private static volatile MemoryManager memoryManager = null;
private final AgentConfiguration conf;
private ConcurrentHashMap<String, Semaphore> semaphoreMap = new
ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, Long> lastPrintTime = new
ConcurrentHashMap<>();
+ private static final int PRINT_INTERVAL_MS = 1000;
private MemoryManager() {
this.conf = AgentConfiguration.getAgentConf();
@@ -48,14 +51,17 @@ public class MemoryManager {
semaphore = new Semaphore(
conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT,
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT));
semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore);
+ lastPrintTime.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, 0L);
semaphore = new Semaphore(
conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT,
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT));
semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore);
+ lastPrintTime.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, 0L);
semaphore = new Semaphore(
conf.getInt(AGENT_GLOBAL_WRITER_PERMIT,
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT));
semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore);
+ lastPrintTime.put(AGENT_GLOBAL_WRITER_PERMIT, 0L);
}
/**
@@ -99,19 +105,23 @@ public class MemoryManager {
return semaphore.availablePermits();
}
- public void printDetail(String semaphoreName) {
+ public void printDetail(String semaphoreName, String detail) {
Semaphore semaphore = semaphoreMap.get(semaphoreName);
if (semaphore == null) {
- LOGGER.error("printDetail {} not exist");
+ LOGGER.error("printDetail {} not exist", semaphoreName);
return;
}
- LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(),
semaphore.getQueueLength(),
- semaphoreName);
+ if (AgentUtils.getCurrentTime() - lastPrintTime.get(semaphoreName) >
PRINT_INTERVAL_MS) {
+ LOGGER.info("{} permit left {} wait {} {}", detail,
semaphore.availablePermits(),
+ semaphore.getQueueLength(),
+ semaphoreName);
+ lastPrintTime.put(semaphoreName, AgentUtils.getCurrentTime());
+ }
}
public void printAll() {
- printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT);
- printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT);
- printDetail(AGENT_GLOBAL_WRITER_PERMIT);
+ printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT, "printAll");
+ printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT, "printAll");
+ printDetail(AGENT_GLOBAL_WRITER_PERMIT, "printAll");
}
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 991be20c05..9aa71a96ab 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -369,7 +369,7 @@ public class TaskManager extends AbstractDaemon {
*/
private void addToDb(TaskProfile taskProfile) {
if (taskDb.getTask(taskProfile.getTaskId()) != null) {
- LOGGER.error("task {} should not exist", taskProfile);
+ LOGGER.error("task {} should not exist", taskProfile.getTaskId());
}
taskDb.storeTask(taskProfile);
}
@@ -398,7 +398,7 @@ public class TaskManager extends AbstractDaemon {
oldTask.destroy();
taskMap.remove(taskProfile.getTaskId());
LOGGER.error("old task {} should not exist, try stop it first",
- taskProfile);
+ taskProfile.getTaskId());
}
try {
Class<?> taskClass = Class.forName(taskProfile.getTaskClass());
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 0745c13c72..e858743cb3 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -64,7 +64,7 @@ public class TestInstanceManager {
public void testInstanceManager() {
long timeBefore = AgentUtils.getCurrentTime();
InstanceProfile profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
- helper.getTestRootDir() + "/20230927_1.txt", "20230927");
+ helper.getTestRootDir() + "/20230927_1.txt", "20230927",
AgentUtils.getCurrentTime());
String instanceId = profile.getInstanceId();
InstanceAction action = new InstanceAction();
action.setActionType(ActionType.ADD);
@@ -85,8 +85,11 @@ public class TestInstanceManager {
Assert.assertTrue(manager.shouldAddAgain(profile.getInstanceId(),
AgentUtils.getCurrentTime()));
// test continue
+ profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
+ helper.getTestRootDir() + "/20230927_1.txt", "20230927",
AgentUtils.getCurrentTime());
+ action = new InstanceAction();
action.setActionType(ActionType.ADD);
- profile.setState(InstanceStateEnum.DEFAULT);
+ action.setProfile(profile);
manager.submitAction(action);
await().atMost(1, TimeUnit.SECONDS).until(() ->
manager.getInstance(instanceId) != null);
Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() ==
InstanceStateEnum.DEFAULT);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index 4657cd4bfc..30bbf2d956 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -96,8 +96,7 @@ public class ProxySink extends AbstractSink {
boolean writerPermitSuc = MemoryManager.getInstance()
.tryAcquire(AGENT_GLOBAL_WRITER_PERMIT,
message.getBody().length);
if (!writerPermitSuc) {
- LOGGER.warn("writer tryAcquire failed");
-
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT);
+
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT, "proxy
sink");
return false;
}
cache.generateExtraMap(proxyMessage.getDataKey());
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index ea0e63c95f..1e187e2bf4 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -375,8 +375,7 @@ public class LogFileSource extends AbstractSource {
while (!suc) {
suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
if (!suc) {
- LOGGER.warn("get permit {} failed", permitName);
- MemoryManager.getInstance().printDetail(permitName);
+ MemoryManager.getInstance().printDetail(permitName, "log file
source");
if (!isRunnable()) {
return false;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 26007d0290..11d9330274 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -273,6 +273,17 @@ public class LogFileCollectTask extends Task {
});
}
+ private boolean isInEventMap(String fileName, String dataTime) {
+ Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime);
+ if (fileToProfile == null) {
+ return false;
+ }
+ if (fileToProfile.get(fileName) == null) {
+ return false;
+ }
+ return true;
+ }
+
private List<BasicFileInfo> scanExistingFileByPattern(String
originPattern) {
long startScanTime = startTime;
long endScanTime = endTime;
@@ -305,14 +316,20 @@ public class LogFileCollectTask extends Task {
removeTimeoutEven(eventMap, retry);
for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
Map<String, InstanceProfile> sameDataTimeEvents = entry.getValue();
- // 根据event的数据时间、业务的周期、偏移量计算出该event是否需要在当前时间处理
+ if (sameDataTimeEvents.isEmpty()) {
+ return;
+ }
+ /*
+ * Calculate whether the event needs to be processed at the
current time based on its data time, business
+ * cycle, and offset
+ */
String dataTime = entry.getKey();
String shouldStartTime =
NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), taskProfile.getTimeOffset());
String currentTime = getCurrentTime();
- LOGGER.info("taskId {}, dataTime {}, currentTime {},
shouldStartTime {}",
- new Object[]{getTaskId(), dataTime, currentTime,
shouldStartTime});
if (currentTime.compareTo(shouldStartTime) >= 0) {
+ LOGGER.info("submit now taskId {}, dataTime {}, currentTime
{}, shouldStartTime {}",
+ new Object[]{getTaskId(), dataTime, currentTime,
shouldStartTime});
/* These codes will sort the FileCreationEvents by create
time. */
Set<InstanceProfile> sortedEvents = new
TreeSet<>(sameDataTimeEvents.values());
/* Check the file end with event creation time in asc order. */
@@ -326,6 +343,9 @@ public class LogFileCollectTask extends Task {
}
sameDataTimeEvents.remove(fileName);
}
+ } else {
+ LOGGER.info("submit later taskId {}, dataTime {}, currentTime
{}, shouldStartTime {}",
+ new Object[]{getTaskId(), dataTime, currentTime,
shouldStartTime});
}
}
}
@@ -335,7 +355,7 @@ public class LogFileCollectTask extends Task {
return;
}
for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
- // 如果event的数据时间在当前时间前(后)2天之内,则有效
+ /* If the data time of the event is within 2 days before (after)
the current time, it is valid */
String dataTime = entry.getKey();
if (!NewDateUtils.isValidCreationTime(dataTime,
DAY_TIMEOUT_INTERVAL)) {
/* Remove it from memory map. */
@@ -429,9 +449,11 @@ public class LogFileCollectTask extends Task {
}
private void addToEvenMap(String fileName, String dataTime) {
- Long lastModifyTime = FileUtils.getFileLastModifyTime(fileName);
- if (!instanceManager.shouldAddAgain(fileName, lastModifyTime)) {
- LOGGER.info("file {} has record in db", fileName);
+ if (isInEventMap(fileName, dataTime)) {
+ return;
+ }
+ Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName);
+ if (!instanceManager.shouldAddAgain(fileName, fileUpdateTime)) {
return;
}
Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.computeIfAbsent(dataTime,
@@ -442,7 +464,7 @@ public class LogFileCollectTask extends Task {
return;
}
InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE,
- fileName, dataTime);
+ fileName, dataTime, fileUpdateTime);
sameDataTimeEvents.put(fileName, instanceProfile);
}
@@ -467,12 +489,12 @@ public class LogFileCollectTask extends Task {
* For this case, we can simple think that the next file creation
means the last task of this conf should finish
* reading and start reading this new file.
*/
- // 从文件名称中提取数据时间
+ // Extract data time from file name
String fileTime = NewDateUtils.getDateTime(fileName, originPattern,
dateExpression);
/**
- * 将文件时间中任意非数字字符替换掉
- * 如2015-09-16_00替换成2015091600
+ * Replace any non-numeric characters in the file time
+ * such as 2015-09-16_00 replace with 2015091600
*/
return fileTime.replaceAll("\\D", "");
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 084af58851..bf96afac74 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -74,7 +74,7 @@ public class TestSenderManager {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING);
profile = taskProfile.createInstanceProfile("", fileName,
- "20230927");
+ "20230927", AgentUtils.getCurrentTime());
}
@AfterClass
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 23b5027812..5617976077 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -24,6 +24,7 @@ import org.apache.inlong.agent.core.task.file.MemoryManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
import com.google.gson.Gson;
@@ -60,7 +61,7 @@ public class TestLogFileSource {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING);
instanceProfile = taskProfile.createInstanceProfile("",
- fileName, "20230928");
+ fileName, "20230928", AgentUtils.getCurrentTime());
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
index 73f999230d..877f360ca8 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.reader.MqttReader;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.junit.Ignore;
import org.slf4j.Logger;
@@ -60,7 +61,7 @@ public class TestMqttConnect {
@Override
public void run() {
- reader.init(jobProfile.createInstanceProfile("", "", ""));
+ reader.init(jobProfile.createInstanceProfile("", "", "",
AgentUtils.getCurrentTime()));
while (!reader.isFinished()) {
Message message = reader.read();
if (Objects.nonNull(message)) {