This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 1c790a399ea7bb9e964d8215e7d65584aba53705 Author: ganfengtan <[email protected]> AuthorDate: Fri Nov 4 21:18:58 2022 +0800 [INLONG-6332][Agent] Fix reboot will reset file position error (#6333) Co-authored-by: healchow <[email protected]> --- .../agent/core/task/TaskPositionManager.java | 2 + .../sources/reader/file/MonitorTextFile.java | 101 ++++++++++++--------- 2 files changed, 60 insertions(+), 43 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java index c3bfa0abe..c5b944210 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java @@ -139,6 +139,8 @@ public class TaskPositionManager extends AbstractDaemon { ConcurrentHashMap<String, Long> positionTemp = new ConcurrentHashMap<>(); ConcurrentHashMap<String, Long> position = jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp); if (position == null) { + JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId); + positionTemp.put(sourcePath, jobProfile.getLong(sourcePath + POSITION_SUFFIX, 0)); position = positionTemp; } Long beforePosition = position.getOrDefault(sourcePath, 0L); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java index d8160bd8b..1f5958cdf 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java @@ -21,10 +21,10 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.attribute.BasicFileAttributes; -import java.util.Objects; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,26 +35,28 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_EXP import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INTERVAL; /** - * monitor files + * Monitor for text files */ public final class MonitorTextFile { private static final Logger LOGGER = LoggerFactory.getLogger(MonitorTextFile.class); - private static volatile MonitorTextFile monitorTextFile = null; // monitor thread pool - private final ThreadPoolExecutor runningPool = new ThreadPoolExecutor( + private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new AgentThreadFactory("monitor-file")); + private static volatile MonitorTextFile monitorTextFile = null; + private MonitorTextFile() { } /** - * Mode of singleton - * @return MonitorTextFile instance + * Get a singleton instance of MonitorTextFile. + * + * @return monitor text file instance */ public static MonitorTextFile getInstance() { if (monitorTextFile == null) { @@ -68,37 +70,35 @@ public final class MonitorTextFile { } public void monitor(FileReaderOperator fileReaderOperator, TextFileReader textFileReader) { - MonitorEventRunnable monitorEvent = new MonitorEventRunnable(fileReaderOperator, textFileReader); - runningPool.execute(monitorEvent); + EXECUTOR_SERVICE.execute(new MonitorEventRunnable(fileReaderOperator, textFileReader)); } /** - * monitor file event + * Runnable for monitor the file event */ - private class MonitorEventRunnable implements Runnable { + private static class MonitorEventRunnable implements Runnable { private static final int WAIT_TIME = 30; private final FileReaderOperator fileReaderOperator; private final TextFileReader textFileReader; private final Long interval; private final long startTime = System.currentTimeMillis(); + private long lastFlushTime = System.currentTimeMillis(); private String path; - /** - * the last modify time of the file - */ + + // the last modify time of the file private BasicFileAttributes attributesBefore; - public MonitorEventRunnable(FileReaderOperator fileReaderOperator, TextFileReader textFileReader) { - this.fileReaderOperator = fileReaderOperator; + public MonitorEventRunnable(FileReaderOperator readerOperator, TextFileReader textFileReader) { + this.fileReaderOperator = readerOperator; this.textFileReader = textFileReader; - this.interval = Long - .parseLong(fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_INTERVAL, INTERVAL_MILLISECONDS)); + this.interval = Long.parseLong( + readerOperator.jobConf.get(JOB_FILE_MONITOR_INTERVAL, INTERVAL_MILLISECONDS)); try { - this.attributesBefore = Files - .readAttributes(fileReaderOperator.file.toPath(), BasicFileAttributes.class); - this.path = this.fileReaderOperator.file.getCanonicalPath(); + this.attributesBefore = Files.readAttributes(readerOperator.file.toPath(), BasicFileAttributes.class); + this.path = readerOperator.file.getCanonicalPath(); } catch (IOException e) { - LOGGER.error("get {} last modify time error:", fileReaderOperator.file.getName(), e); + LOGGER.error("get {} last modify time error:", readerOperator.file.getName(), e); } } @@ -106,10 +106,10 @@ public final class MonitorTextFile { public void run() { try { TimeUnit.SECONDS.sleep(WAIT_TIME); - LOGGER.info("start {} monitor", this.fileReaderOperator.file.getAbsolutePath()); - while (!this.fileReaderOperator.finished) { - long expireTime = Long.parseLong(fileReaderOperator.jobConf - .get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE)); + LOGGER.info("start {} monitor", fileReaderOperator.file.getAbsolutePath()); + while (!fileReaderOperator.finished) { + long expireTime = Long.parseLong( + fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE)); long currentTime = System.currentTimeMillis(); if (expireTime != Long.parseLong(JOB_FILE_MONITOR_DEFAULT_EXPIRE) && currentTime - this.startTime > expireTime) { @@ -119,39 +119,54 @@ public final class MonitorTextFile { TimeUnit.MILLISECONDS.sleep(interval); } } catch (Exception e) { - LOGGER.error("monitor {} error:", this.fileReaderOperator.file.getName(), e); + LOGGER.error(String.format("monitor %s error", fileReaderOperator.file.getName()), e); } } private void listen() throws IOException { BasicFileAttributes attributesAfter; String currentPath; + File file = fileReaderOperator.file; try { - attributesAfter = Files - .readAttributes(this.fileReaderOperator.file.toPath(), BasicFileAttributes.class); - currentPath = this.fileReaderOperator.file.getCanonicalPath(); + attributesAfter = Files.readAttributes(file.toPath(), BasicFileAttributes.class); + currentPath = file.getCanonicalPath(); } catch (Exception e) { - // Set position 0 when split file - this.fileReaderOperator.position = 0; - LOGGER.error("monitor {} error, reset position is 0:", this.fileReaderOperator.file.getName(), e); + // set position 0 when split file + fileReaderOperator.position = 0; + LOGGER.error(String.format("monitor file %s error, reset position to 0", file.getName()), e); return; } - // If change symbolic links + + // if change symbolic links if (attributesAfter.isSymbolicLink() && !path.equals(currentPath)) { - this.fileReaderOperator.position = 0; + fileReaderOperator.position = 0; path = currentPath; } if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0) { - // Not triggered during data sending - if (Objects.nonNull(this.fileReaderOperator.iterator) && this.fileReaderOperator.iterator - .hasNext()) { - return; - } - this.textFileReader.getData(); - this.textFileReader.mergeData(this.fileReaderOperator); - this.attributesBefore = attributesAfter; - this.fileReaderOperator.iterator = fileReaderOperator.stream.iterator(); + // not triggered during data sending + getFileData(); + attributesBefore = attributesAfter; + return; + } + lastFlushData(); + } + + private void lastFlushData() throws IOException { + long currentTime = System.currentTimeMillis(); + if (interval * 100 > currentTime - lastFlushTime) { + return; + } + getFileData(); + } + + private void getFileData() throws IOException { + if (fileReaderOperator.iterator != null && fileReaderOperator.iterator.hasNext()) { + return; } + this.textFileReader.getData(); + this.textFileReader.mergeData(this.fileReaderOperator); + this.fileReaderOperator.iterator = fileReaderOperator.stream.iterator(); + this.lastFlushTime = System.currentTimeMillis(); } } }
