This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b5ab82ce0 [INLONG-9982][Agent] Adjusting the abstraction of source 
code to facilitate rapid addition of sources (#9984)
6b5ab82ce0 is described below

commit 6b5ab82ce0da2023c545c6a4d89b7febb3b92b3a
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Apr 16 18:35:30 2024 +0800

    [INLONG-9982][Agent] Adjusting the abstraction of source code to facilitate 
rapid addition of sources (#9984)
    
    * [INLONG-9982][Agent] Adjusting the abstraction of source code to 
facilitate rapid addition of sources
    
    * Update 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
    
    Co-authored-by: AloysZhang <[email protected]>
    
    * [INLONG-9982][Agent] Modify based on comments
    
    * [INLONG-9982][Agent] Modify based on comments
    
    * [INLONG-9982][Agent] Modify based on comments
    
    ---------
    
    Co-authored-by: AloysZhang <[email protected]>
---
 .../apache/inlong/agent/plugin/file/Source.java    |   6 +
 .../agent/plugin/sources/DatabaseSqlSource.java    |  30 ++
 .../inlong/agent/plugin/sources/KafkaSource.java   | 287 +++-----------
 .../inlong/agent/plugin/sources/LogFileSource.java | 430 ++++-----------------
 .../inlong/agent/plugin/sources/MongoDBSource.java |  30 ++
 .../inlong/agent/plugin/sources/MqttSource.java    |  30 ++
 .../inlong/agent/plugin/sources/OracleSource.java  |  30 ++
 .../agent/plugin/sources/PostgreSQLSource.java     |  30 ++
 .../inlong/agent/plugin/sources/PulsarSource.java  | 277 ++++---------
 .../inlong/agent/plugin/sources/RedisSource.java   |  30 ++
 .../agent/plugin/sources/SQLServerSource.java      |  30 ++
 .../agent/plugin/sources/file/AbstractSource.java  | 335 ++++++++++++++++
 .../reader/file/KubernetesMetadataProvider.java    | 121 ------
 .../agent/plugin/sources/TestLogFileSource.java    |   1 +
 14 files changed, 751 insertions(+), 916 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java
index c65f305227..26a5ec733d 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java
@@ -50,6 +50,12 @@ public interface Source {
      */
     void init(InstanceProfile profile);
 
+    /**
+     * Executed after init, usually used to start the source's worker thread
+     *
+     */
+    void start();
+
     /**
      * destroy
      */
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
index 227dd375ef..2df77b4f83 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
@@ -88,11 +88,41 @@ public class DatabaseSqlSource extends AbstractSource {
         return readerList;
     }
 
+    @Override
+    protected String getThreadName() {
+        return null;
+    }
+
+    @Override
+    protected void printCurrentState() {
+
+    }
+
+    @Override
+    protected boolean doPrepareToRead() {
+        return false;
+    }
+
+    @Override
+    protected List<SourceData> readFromSource() {
+        return null;
+    }
+
     @Override
     public Message read() {
         return null;
     }
 
+    @Override
+    protected boolean isRunnable() {
+        return runnable;
+    }
+
+    @Override
+    protected void releaseSource() {
+
+    }
+
     @Override
     public boolean sourceFinish() {
         return false;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index fe31730fdc..42e6d7c70f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -17,23 +17,12 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.core.task.MemoryManager;
 import org.apache.inlong.agent.except.FileException;
-import org.apache.inlong.agent.message.DefaultMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.utils.AgentUtils;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -51,23 +40,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
 import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
 import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET;
 import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_BOOTSTRAP_SERVERS;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_OFFSET;
@@ -79,45 +53,17 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_PARTITIO
  */
 public class KafkaSource extends AbstractSource {
 
-    @Data
-    @AllArgsConstructor
-    @NoArgsConstructor
-    private class SourceData {
-
-        private byte[] data;
-        private Long offset;
-    }
-
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaSource.class);
-    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
-            0, Integer.MAX_VALUE,
-            1L, TimeUnit.SECONDS,
-            new SynchronousQueue<>(),
-            new AgentThreadFactory("kafka-source"));
-    private BlockingQueue<SourceData> queue;
-    public InstanceProfile profile;
-    private int maxPackSize;
-    private String taskId;
-    private String instanceId;
     private String topic;
     private Properties props = new Properties();
     private String allPartitionOffsets;
     Map<Integer, Long> partitionOffsets = new HashMap<>();
-    private volatile boolean running = false;
-    private volatile boolean runnable = true;
-    private volatile AtomicLong emptyCount = new AtomicLong(0);
-
-    private final Integer CACHE_QUEUE_SIZE = 100000;
-    private final Integer READ_WAIT_TIMEOUT_MS = 10;
-    private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
-    private final Integer BATCH_TOTAL_LEN = 1024 * 1024;
-
     private static final String KAFKA_DESERIALIZER_METHOD =
             "org.apache.kafka.common.serialization.ByteArrayDeserializer";
     private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms";
-    private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
-    private boolean isRealTime = false;
     private boolean isRestoreFromDB = false;
+    private KafkaConsumer<String, byte[]> kafkaConsumer;
+    private long offset = 0L;
 
     public KafkaSource() {
     }
@@ -128,15 +74,6 @@ public class KafkaSource extends AbstractSource {
             LOGGER.info("KafkaSource init: {}", profile.toJsonStr());
             this.profile = profile;
             super.init(profile);
-            String cycleUnit = profile.get(TASK_CYCLE_UNIT);
-            if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
-                isRealTime = true;
-                cycleUnit = CycleUnitType.HOUR;
-            }
-            queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
-            maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
-            taskId = profile.getTaskId();
-            instanceId = profile.getInstanceId();
             topic = profile.getInstanceId();
             props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
profile.get(TASK_KAFKA_BOOTSTRAP_SERVERS));
             props.put(ConsumerConfig.GROUP_ID_CONFIG, taskId);
@@ -155,141 +92,82 @@ public class KafkaSource extends AbstractSource {
                             
Long.valueOf(offset.split(TASK_KAFKA_PARTITION_OFFSET_DELIMITER)[1]));
                 }
             }
-
-            EXECUTOR_SERVICE.execute(run());
+            kafkaConsumer = getKafkaConsumer();
         } catch (Exception ex) {
             stopRunning();
             throw new FileException("error init stream for " + topic, ex);
         }
     }
 
-    private Runnable run() {
-        return () -> {
-            AgentThreadFactory.nameThread("kafka-source-" + taskId + "-" + 
instanceId);
-            running = true;
-            try {
-                List<PartitionInfo> partitionInfoList;
-                try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(props)) {
-                    partitionInfoList = consumer.partitionsFor(topic);
-                }
-
-                props.put(KAFKA_SESSION_TIMEOUT, 30000);
-
-                try (KafkaConsumer<String, byte[]> kafkaConsumer = new 
KafkaConsumer<>(props)) {
-                    if (null != partitionInfoList) {
-                        List<TopicPartition> topicPartitions = new 
ArrayList<>();
-                        for (PartitionInfo partitionInfo : partitionInfoList) {
-                            TopicPartition topicPartition = new 
TopicPartition(partitionInfo.topic(),
-                                    partitionInfo.partition());
-                            topicPartitions.add(topicPartition);
-                        }
-                        kafkaConsumer.assign(topicPartitions);
-
-                        if (!isRestoreFromDB && 
StringUtils.isNotBlank(allPartitionOffsets)) {
-                            for (TopicPartition topicPartition : 
topicPartitions) {
-                                Long offset = 
partitionOffsets.get(topicPartition.partition());
-                                if (ObjectUtils.isNotEmpty(offset)) {
-                                    kafkaConsumer.seek(topicPartition, offset);
-                                }
-                            }
-                        } else {
-                            LOGGER.info("Skip to seek offset");
-                        }
-                    }
-                    doRun(kafkaConsumer);
-                }
-            } catch (Throwable e) {
-                LOGGER.error("do run error maybe topic is configured 
incorrectly: ", e);
-            }
-            running = false;
-        };
+    @Override
+    protected String getThreadName() {
+        return "kafka-source-" + taskId + "-" + instanceId;
     }
 
-    private void doRun(KafkaConsumer<String, byte[]> kafkaConsumer) {
-        long lastPrintTime = 0;
-        while (isRunnable()) {
-            boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
-            if (!suc) {
-                break;
-            }
-            ConsumerRecords<String, byte[]> records = 
kafkaConsumer.poll(Duration.ofMillis(1000));
-            if (records.isEmpty()) {
-                if (queue.isEmpty()) {
-                    emptyCount.incrementAndGet();
-                } else {
-                    emptyCount.set(0);
-                }
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
-                AgentUtils.silenceSleepInSeconds(1);
-                continue;
-            }
-            emptyCount.set(0);
-            long offset = 0L;
-            for (ConsumerRecord<String, byte[]> record : records) {
-                SourceData sourceData = new SourceData(record.value(), 
record.offset());
-                boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, record.value().length);
-                if (!suc4Queue) {
-                    break;
-                }
-                putIntoQueue(sourceData);
-                offset = record.offset();
-            }
-            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
-            kafkaConsumer.commitSync();
-
-            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
-                lastPrintTime = AgentUtils.getCurrentTime();
-                LOGGER.info("kafka topic is {}, offset is {}", topic, offset);
-            }
-        }
+    @Override
+    protected boolean doPrepareToRead() {
+        return true;
     }
 
-    private boolean waitForPermit(String permitName, int permitLen) {
-        boolean suc = false;
-        while (!suc) {
-            suc = MemoryManager.getInstance().tryAcquire(permitName, 
permitLen);
-            if (!suc) {
-                MemoryManager.getInstance().printDetail(permitName, "log file 
source");
-                if (!isRunnable()) {
-                    return false;
-                }
-                AgentUtils.silenceSleepInSeconds(1);
-            }
+    @Override
+    protected List<SourceData> readFromSource() {
+        List<SourceData> dataList = new ArrayList<>();
+        ConsumerRecords<String, byte[]> records = 
kafkaConsumer.poll(Duration.ofMillis(1000));
+        for (ConsumerRecord<String, byte[]> record : records) {
+            SourceData sourceData = new SourceData(record.value(), 
record.offset());
+            dataList.add(sourceData);
+            offset = record.offset();
         }
-        return true;
+        kafkaConsumer.commitSync();
+        return dataList;
     }
 
-    private void putIntoQueue(SourceData sourceData) {
-        if (sourceData == null) {
-            return;
-        }
+    private KafkaConsumer<String, byte[]> getKafkaConsumer() {
+        List<PartitionInfo> partitionInfoList;
+        KafkaConsumer<String, byte[]> kafkaConsumer = null;
+        props.put(KAFKA_SESSION_TIMEOUT, 30000);
         try {
-            boolean offerSuc = false;
-            if (queue.remainingCapacity() > 0) {
-                while (isRunnable() && !offerSuc) {
-                    offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
+            kafkaConsumer = new KafkaConsumer<>(props);
+            partitionInfoList = kafkaConsumer.partitionsFor(topic);
+            if (partitionInfoList == null) {
+                kafkaConsumer.close();
+                return null;
+            }
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            for (PartitionInfo partitionInfo : partitionInfoList) {
+                TopicPartition topicPartition = new 
TopicPartition(partitionInfo.topic(),
+                        partitionInfo.partition());
+                topicPartitions.add(topicPartition);
+            }
+            kafkaConsumer.assign(topicPartitions);
+            if (!isRestoreFromDB && 
StringUtils.isNotBlank(allPartitionOffsets)) {
+                for (TopicPartition topicPartition : topicPartitions) {
+                    Long offset = 
partitionOffsets.get(topicPartition.partition());
+                    if (ObjectUtils.isNotEmpty(offset)) {
+                        kafkaConsumer.seek(topicPartition, offset);
+                    }
                 }
+            } else {
+                LOGGER.info("Skip to seek offset");
             }
-
-            if (!offerSuc) {
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
+            return kafkaConsumer;
+        } catch (Exception e) {
+            if (kafkaConsumer != null) {
+                kafkaConsumer.close();
             }
-            LOGGER.debug("Read {} from kafka topic {}", sourceData.getData(), 
topic);
-        } catch (InterruptedException e) {
-            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
-            LOGGER.error("fetchData offer failed {}", e.getMessage());
+            LOGGER.error("get kafka consumer error", e);
         }
+        return null;
     }
 
-    public boolean isRunnable() {
-        return runnable;
+    @Override
+    protected void printCurrentState() {
+        LOGGER.info("kafka topic is {}, offset is {}", topic, offset);
     }
 
-    /**
-     * Stop running threads.
-     */
-    public void stopRunning() {
-        runnable = false;
+    @Override
+    protected boolean isRunnable() {
+        return runnable;
     }
 
     @Override
@@ -298,57 +176,14 @@ public class KafkaSource extends AbstractSource {
     }
 
     @Override
-    public Message read() {
-        SourceData sourceData = null;
-        try {
-            sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            LOGGER.warn("poll {} data get interrupted.", topic, e);
-        }
-        if (sourceData == null) {
-            return null;
-        }
-        MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
-        Message finalMsg = createMessage(sourceData);
-        return finalMsg;
-    }
-
-    private Message createMessage(SourceData sourceData) {
-        String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
-        Map<String, String> header = new HashMap<>();
-        header.put(PROXY_KEY_DATA, proxyPartitionKey);
-        header.put(OFFSET, sourceData.offset.toString());
-        header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
-
-        long auditTime = 0;
-        if (isRealTime) {
-            auditTime = AgentUtils.getCurrentTime();
-        } else {
-            auditTime = profile.getSinkDataTime();
-        }
-        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
-                auditTime, 1, sourceData.data.length);
-        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, 
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
-                AgentUtils.getCurrentTime(), 1, sourceData.data.length);
-        Message finalMsg = new DefaultMessage(sourceData.data, header);
-        if (finalMsg.getBody().length > maxPackSize) {
-            LOGGER.warn("message size is {}, greater than max pack size {}, 
drop it!",
-                    finalMsg.getBody().length, maxPackSize);
-            return null;
-        }
-        return finalMsg;
+    public boolean sourceExist() {
+        return true;
     }
 
     @Override
-    public boolean sourceFinish() {
-        if (isRealTime) {
-            return false;
+    protected void releaseSource() {
+        if (kafkaConsumer != null) {
+            kafkaConsumer.close();
         }
-        return emptyCount.get() > EMPTY_CHECK_COUNT_AT_LEAST;
-    }
-
-    @Override
-    public boolean sourceExist() {
-        return true;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 9f83f6fc06..037470fb41 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -17,32 +17,17 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.conf.OffsetProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.constant.DataCollectType;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.task.MemoryManager;
-import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.except.FileException;
-import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler;
-import 
org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
 import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DateTransUtils;
 
-import com.google.gson.Gson;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,93 +38,28 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.LineNumberReader;
 import java.io.RandomAccessFile;
-import java.lang.reflect.Constructor;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES;
-import static org.apache.inlong.agent.constant.MetadataConstants.DATA_CONTENT;
-import static 
org.apache.inlong.agent.constant.MetadataConstants.DATA_CONTENT_TIME;
-import static org.apache.inlong.agent.constant.MetadataConstants.ENV_CVM;
-import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_FILE_NAME;
-import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_HOST_NAME;
-import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_SOURCE_IP;
-import static 
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
-import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
-import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_META_ENV_LIST;
 
 /**
  * Read text files
  */
 public class LogFileSource extends AbstractSource {
 
-    @Data
-    @AllArgsConstructor
-    @NoArgsConstructor
-    private class SourceData {
-
-        private String data;
-        private Long offset;
-    }
-
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LogFileSource.class);
-    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
-            0, Integer.MAX_VALUE,
-            1L, TimeUnit.SECONDS,
-            new SynchronousQueue<>(),
-            new AgentThreadFactory("log-file-source"));
-    private final Integer BATCH_READ_LINE_COUNT = 10000;
-    private final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
-    private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
-    private final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
     private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
-    private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
     private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
-    private final Integer READ_WAIT_TIMEOUT_MS = 10;
-    private final SimpleDateFormat RECORD_TIME_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-    public InstanceProfile profile;
-    private String taskId;
-    private String instanceId;
-    private int maxPackSize;
+
     private String fileName;
     private File file;
     private byte[] bufferToReadFile;
     public volatile long linePosition = 0;
     public volatile long bytePosition = 0;
-    private boolean needMetadata = false;
-    public Map<String, String> metadata;
     private boolean isIncrement = false;
-    private BlockingQueue<SourceData> queue;
-    private final Gson GSON = new Gson();
-    private volatile boolean runnable = true;
     private volatile boolean fileExist = true;
     private String inodeInfo;
     private volatile long lastInodeUpdateTime = 0;
-    private volatile boolean running = false;
-    private long dataTime = 0;
-    private volatile long emptyCount = 0;
-    private ExtendedHandler extendedHandler;
-    private boolean isRealTime = false;
+    private RandomAccessFile randomAccessFile;
 
     public LogFileSource() {
     }
@@ -148,17 +68,8 @@ public class LogFileSource extends AbstractSource {
     public void init(InstanceProfile profile) {
         try {
             LOGGER.info("LogFileSource init: {}", profile.toJsonStr());
-            this.profile = profile;
             super.init(profile);
-            String cycleUnit = profile.get(TASK_CYCLE_UNIT);
-            if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
-                isRealTime = true;
-                cycleUnit = CycleUnitType.HOUR;
-            }
-            taskId = profile.getTaskId();
-            instanceId = profile.getInstanceId();
             fileName = profile.getInstanceId();
-            maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
             bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
             isIncrement = isIncrement(profile);
             file = new File(fileName);
@@ -166,28 +77,65 @@ public class LogFileSource extends AbstractSource {
             lastInodeUpdateTime = AgentUtils.getCurrentTime();
             linePosition = getInitLineOffset(isIncrement, taskId, instanceId, 
inodeInfo);
             bytePosition = getBytePositionByLine(linePosition);
-            queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
-            dataTime = 
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), cycleUnit);
-            if 
(DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName())
 != 0) {
-                Constructor<?> constructor =
-                        Class.forName(
-                                
profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS, 
DEFAULT_FILE_SOURCE_EXTEND_CLASS))
-                                .getDeclaredConstructor(InstanceProfile.class);
-                constructor.setAccessible(true);
-                extendedHandler = (ExtendedHandler) 
constructor.newInstance(profile);
-            }
-            try {
-                registerMeta(profile);
-            } catch (Exception ex) {
-                LOGGER.error("init metadata error", ex);
-            }
-            EXECUTOR_SERVICE.execute(run());
+            randomAccessFile = new RandomAccessFile(file, "r");
         } catch (Exception ex) {
             stopRunning();
             throw new FileException("error init stream for " + file.getPath(), 
ex);
         }
     }
 
+    @Override
+    protected boolean doPrepareToRead() {
+        if (isInodeChanged()) {
+            fileExist = false;
+            LOGGER.info("inode changed, instance will restart and offset will 
be clean, file {}",
+                    fileName);
+            return false;
+        }
+        if (file.length() < bytePosition) {
+            fileExist = false;
+            LOGGER.info("file rotate, instance will restart and offset will be 
clean, file {}",
+                    fileName);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    protected List<SourceData> readFromSource() {
+        try {
+            return readFromPos(bytePosition);
+        } catch (FileNotFoundException e) {
+            fileExist = false;
+            LOGGER.error("readFromPos file deleted error: ", e);
+        } catch (IOException e) {
+            LOGGER.error("readFromPos error: ", e);
+        }
+        return null;
+    }
+
+    @Override
+    protected void printCurrentState() {
+        LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len 
{}", file.getName(), linePosition,
+                bytePosition, file.length());
+    }
+
+    @Override
+    protected String getThreadName() {
+        return "log-file-source-" + taskId + "-" + fileName;
+    }
+
+    private List<SourceData> readFromPos(long pos) throws IOException {
+        List<byte[]> lines = new ArrayList<>();
+        List<SourceData> dataList = new ArrayList<>();
+        bytePosition = readLines(randomAccessFile, pos, lines, 
BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false);
+        for (int i = 0; i < lines.size(); i++) {
+            linePosition++;
+            dataList.add(new SourceData(lines.get(i), linePosition));
+        }
+        return dataList;
+    }
+
     private int getRealLineCount(String fileName) {
         try (LineNumberReader lineNumberReader = new LineNumberReader(new 
FileReader(instanceId))) {
             lineNumberReader.skip(Long.MAX_VALUE);
@@ -199,7 +147,6 @@ public class LogFileSource extends AbstractSource {
     }
 
     private long getInitLineOffset(boolean isIncrement, String taskId, String 
instanceId, String inodeInfo) {
-        OffsetProfile offsetProfile = 
OffsetManager.getInstance().getOffset(taskId, instanceId);
         long offset = 0;
         if (offsetProfile != null && 
offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
             offset = offsetProfile.getOffset();
@@ -229,24 +176,6 @@ public class LogFileSource extends AbstractSource {
         return file;
     }
 
-    public void registerMeta(InstanceProfile jobConf) {
-        if (!jobConf.hasKey(TASK_FILE_META_ENV_LIST)) {
-            return;
-        }
-        String[] env = jobConf.get(TASK_FILE_META_ENV_LIST).split(COMMA);
-        Arrays.stream(env).forEach(data -> {
-            if (data.equalsIgnoreCase(KUBERNETES)) {
-                needMetadata = true;
-                new KubernetesMetadataProvider(this).getData();
-            } else if (data.equalsIgnoreCase(ENV_CVM)) {
-                needMetadata = true;
-                metadata.put(METADATA_HOST_NAME, AgentUtils.getLocalHost());
-                metadata.put(METADATA_SOURCE_IP, AgentUtils.fetchLocalIp());
-                metadata.put(METADATA_FILE_NAME, file.getName());
-            }
-        });
-    }
-
     private boolean isIncrement(InstanceProfile profile) {
         if (profile.hasKey(TaskConstants.TASK_FILE_CONTENT_COLLECT_TYPE) && 
DataCollectType.INCREMENT
                 
.equalsIgnoreCase(profile.get(TaskConstants.TASK_FILE_CONTENT_COLLECT_TYPE))) {
@@ -262,7 +191,7 @@ public class LogFileSource extends AbstractSource {
         try {
             input = new RandomAccessFile(file, "r");
             while (readCount < linePosition) {
-                List<String> lines = new ArrayList<>();
+                List<byte[]> lines = new ArrayList<>();
                 pos = readLines(input, pos, lines, Math.min((int) 
(linePosition - readCount), BATCH_READ_LINE_COUNT),
                         BATCH_READ_LINE_TOTAL_LEN, true);
                 readCount += lines.size();
@@ -289,7 +218,7 @@ public class LogFileSource extends AbstractSource {
      * @return The new position after the lines have been read
      * @throws IOException if an I/O error occurs.
      */
-    private long readLines(RandomAccessFile reader, long pos, List<String> 
lines, int maxLineCount, int maxLineTotalLen,
+    private long readLines(RandomAccessFile reader, long pos, List<byte[]> 
lines, int maxLineCount, int maxLineTotalLen,
             boolean isCounting)
             throws IOException {
         if (maxLineCount == 0) {
@@ -309,11 +238,10 @@ public class LogFileSource extends AbstractSource {
                 switch (ch) {
                     case '\n':
                         if (isCounting) {
-                            lines.add(new String(""));
+                            lines.add(null);
                         } else {
-                            String temp = new String(baos.toByteArray(), 
StandardCharsets.UTF_8);
-                            lines.add(temp);
-                            lineTotalLen += temp.length();
+                            lines.add(baos.toByteArray());
+                            lineTotalLen += baos.size();
                         }
                         rePos = pos + i + 1;
                         if (overLen) {
@@ -350,79 +278,6 @@ public class LogFileSource extends AbstractSource {
         return rePos;
     }
 
-    @Override
-    public Message read() {
-        SourceData sourceData = null;
-        try {
-            sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            LOGGER.warn("poll {} data get interrupted.", file.getPath(), e);
-        }
-        if (sourceData == null) {
-            return null;
-        }
-        MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length());
-        Message finalMsg = createMessage(sourceData);
-        return finalMsg;
-    }
-
-    private Message createMessage(SourceData sourceData) {
-        String msgWithMetaData = fillMetaData(sourceData.data);
-        String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
-        Map<String, String> header = new HashMap<>();
-        header.put(PROXY_KEY_DATA, proxyPartitionKey);
-        header.put(OFFSET, sourceData.offset.toString());
-        header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
-        if (extendedHandler != null) {
-            extendedHandler.dealWithHeader(header, 
sourceData.getData().getBytes(StandardCharsets.UTF_8));
-        }
-        long auditTime = 0;
-        if (isRealTime) {
-            auditTime = AgentUtils.getCurrentTime();
-        } else {
-            auditTime = profile.getSinkDataTime();
-        }
-        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
-                auditTime, 1, msgWithMetaData.length());
-        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, 
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
-                AgentUtils.getCurrentTime(), 1, msgWithMetaData.length());
-        Message finalMsg = new 
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
-        // if the message size is greater than max pack size,should drop it.
-        if (finalMsg.getBody().length > maxPackSize) {
-            LOGGER.warn("message size is {}, greater than max pack size {}, 
drop it!",
-                    finalMsg.getBody().length, maxPackSize);
-            return null;
-        }
-        return finalMsg;
-    }
-
-    public String fillMetaData(String message) {
-        if (!needMetadata) {
-            return message;
-        }
-        long timestamp = System.currentTimeMillis();
-        boolean isJson = FileDataUtils.isJSON(message);
-        Map<String, String> mergeData = new HashMap<>(metadata);
-        mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(message, 
isJson));
-        mergeData.put(DATA_CONTENT_TIME, RECORD_TIME_FORMAT.format(new 
Date(timestamp)));
-        return GSON.toJson(mergeData);
-    }
-
-    private boolean waitForPermit(String permitName, int permitLen) {
-        boolean suc = false;
-        while (!suc) {
-            suc = MemoryManager.getInstance().tryAcquire(permitName, 
permitLen);
-            if (!suc) {
-                MemoryManager.getInstance().printDetail(permitName, "log file 
source");
-                if (isInodeChanged() || !isRunnable()) {
-                    return false;
-                }
-                AgentUtils.silenceSleepInSeconds(1);
-            }
-        }
-        return true;
-    }
-
     private boolean isInodeChanged() {
         if (AgentUtils.getCurrentTime() - lastInodeUpdateTime > 
INODE_UPDATE_INTERVAL_MS) {
             try {
@@ -435,159 +290,9 @@ public class LogFileSource extends AbstractSource {
         return false;
     }
 
-    private Runnable run() {
-        return () -> {
-            AgentThreadFactory.nameThread("log-file-source-" + taskId + "-" + 
file);
-            running = true;
-            try {
-                doRun();
-            } catch (Throwable e) {
-                LOGGER.error("do run error maybe file deleted: ", e);
-            }
-            running = false;
-        };
-    }
-
-    private void doRun() {
-        long lastPrintTime = 0;
-        while (isRunnable() && fileExist) {
-            if (isInodeChanged()) {
-                fileExist = false;
-                LOGGER.info("inode changed, instance will restart and offset 
will be clean, file {}",
-                        fileName);
-                break;
-            }
-            if (file.length() < bytePosition) {
-                fileExist = false;
-                LOGGER.info("file rotate, instance will restart and offset 
will be clean, file {}",
-                        fileName);
-                break;
-            }
-            boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
-            if (!suc) {
-                break;
-            }
-            List<SourceData> lines = null;
-            try {
-                lines = readFromPos(bytePosition);
-            } catch (FileNotFoundException e) {
-                fileExist = false;
-                LOGGER.error("readFromPos file deleted error: ", e);
-            } catch (IOException e) {
-                LOGGER.error("readFromPos error: ", e);
-            }
-            if (lines.isEmpty()) {
-                if (queue.isEmpty()) {
-                    emptyCount++;
-                } else {
-                    emptyCount = 0;
-                }
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
-                AgentUtils.silenceSleepInSeconds(1);
-                continue;
-            }
-            emptyCount = 0;
-            for (int i = 0; i < lines.size(); i++) {
-                boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length());
-                if (!suc4Queue) {
-                    break;
-                }
-                putIntoQueue(lines.get(i));
-            }
-            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
-            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
-                lastPrintTime = AgentUtils.getCurrentTime();
-                LOGGER.info("path is {}, linePosition {}, bytePosition is {} 
file len {}, reads lines size {}",
-                        file.getName(), linePosition, bytePosition, 
file.length(), lines.size());
-            }
-        }
-    }
-
-    private void putIntoQueue(SourceData sourceData) {
-        if (sourceData == null) {
-            return;
-        }
-        try {
-            boolean offerSuc = false;
-            while (isRunnable() && offerSuc != true) {
-                offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
-            }
-            if (!offerSuc) {
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length());
-            }
-            LOGGER.debug("Read {} from file {}", sourceData.getData(), 
fileName);
-        } catch (InterruptedException e) {
-            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length());
-            LOGGER.error("fetchData offer failed {}", e.getMessage());
-        }
-    }
-
-    /**
-     * Whether threads can in running state with while loop.
-     *
-     * @return true if threads can run
-     */
-    public boolean isRunnable() {
-        return runnable;
-    }
-
-    /**
-     * Stop running threads.
-     */
-    public void stopRunning() {
-        runnable = false;
-    }
-
-    private List<SourceData> readFromPos(long pos) throws IOException {
-        List<String> lines = new ArrayList<>();
-        List<SourceData> dataList = new ArrayList<>();
-        RandomAccessFile input = new RandomAccessFile(file, "r");
-        bytePosition = readLines(input, pos, lines, BATCH_READ_LINE_COUNT, 
BATCH_READ_LINE_TOTAL_LEN, false);
-        for (int i = 0; i < lines.size(); i++) {
-            linePosition++;
-            dataList.add(new SourceData(lines.get(i), linePosition));
-        }
-        if (input != null) {
-            input.close();
-        }
-        return dataList;
-    }
-
     @Override
-    public void destroy() {
-        LOGGER.info("destroy read source name {}", fileName);
-        stopRunning();
-        while (running) {
-            AgentUtils.silenceSleepInMs(1);
-        }
-        clearQueue(queue);
-        LOGGER.info("destroy read source name {} end", fileName);
-    }
-
-    private void clearQueue(BlockingQueue<SourceData> queue) {
-        if (queue == null) {
-            return;
-        }
-        while (queue != null && !queue.isEmpty()) {
-            SourceData sourceData = null;
-            try {
-                sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                LOGGER.warn("poll {} data get interrupted.", file.getPath(), 
e);
-            }
-            if (sourceData != null) {
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length());
-            }
-        }
-        queue.clear();
-    }
-
-    @Override
-    public boolean sourceFinish() {
-        if (isRealTime) {
-            return false;
-        }
-        return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
+    protected boolean isRunnable() {
+        return runnable && fileExist && !isInodeChanged();
     }
 
     @Override
@@ -599,4 +304,15 @@ public class LogFileSource extends AbstractSource {
     public List<Reader> split(TaskProfile jobConf) {
         return null;
     }
+
+    @Override
+    protected void releaseSource() {
+        if (randomAccessFile != null) {
+            try {
+                randomAccessFile.close();
+            } catch (IOException e) {
+                LOGGER.error("close randomAccessFile error", e);
+            }
+        }
+    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
index 2f96fe66a2..555d87b412 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
@@ -44,11 +44,41 @@ public class MongoDBSource extends AbstractSource {
         return readerList;
     }
 
+    @Override
+    protected String getThreadName() {
+        return null;
+    }
+
+    @Override
+    protected void printCurrentState() {
+
+    }
+
+    @Override
+    protected boolean doPrepareToRead() {
+        return false;
+    }
+
+    @Override
+    protected List<SourceData> readFromSource() {
+        return null;
+    }
+
     @Override
     public Message read() {
         return null;
     }
 
+    @Override
+    protected boolean isRunnable() {
+        return runnable;
+    }
+
+    @Override
+    protected void releaseSource() {
+
+    }
+
     @Override
     public boolean sourceFinish() {
         return false;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
index bb333c2d27..5f14cf3027 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
@@ -79,11 +79,41 @@ public class MqttSource extends AbstractSource {
         return readerList;
     }
 
+    @Override
+    protected String getThreadName() {
+        return null;
+    }
+
+    @Override
+    protected void printCurrentState() {
+
+    }
+
+    @Override
+    protected boolean doPrepareToRead() {
+        return false;
+    }
+
+    @Override
+    protected List<SourceData> readFromSource() {
+        return null;
+    }
+
     @Override
     public Message read() {
         return null;
     }
 
+    @Override
+    protected boolean isRunnable() {
+        return runnable;
+    }
+
+    @Override
+    protected void releaseSource() {
+
+    }
+
     @Override
     public boolean sourceFinish() {
         return false;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
index d4c86e7634..1d3088a0aa 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
@@ -48,11 +48,41 @@ public class OracleSource extends AbstractSource {
         return readerList;
     }
 
+    @Override
+    protected String getThreadName() {
+        return null;
+    }
+
+    @Override
+    protected void printCurrentState() {
+
+    }
+
+    @Override
+    protected boolean doPrepareToRead() {
+        return false;
+    }
+
+    @Override
+    protected List<SourceData> readFromSource() {
+        return null;
+    }
+
     @Override
     public Message read() {
         return null;
     }
 
+    @Override
+    protected boolean isRunnable() {
+        return runnable;
+    }
+
+    @Override
+    protected void releaseSource() {
+
+    }
+
     @Override
     public boolean sourceFinish() {
         return false;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
index 58f03cfa9b..7a65767c1b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
@@ -49,11 +49,41 @@ public class PostgreSQLSource extends AbstractSource {
         return readerList;
     }
 
+    @Override
+    protected String getThreadName() {
+        return null;
+    }
+
+    @Override
+    protected void printCurrentState() {
+
+    }
+
+    @Override
+    protected boolean doPrepareToRead() {
+        return false;
+    }
+
+    @Override
+    protected List<SourceData> readFromSource() {
+        return null;
+    }
+
     @Override
     public Message read() {
         return null;
     }
 
+    @Override
+    protected boolean isRunnable() {
+        return runnable;
+    }
+
+    @Override
+    protected void releaseSource() {
+
+    }
+
     @Override
     public boolean sourceFinish() {
         return false;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index c64653e2a3..64974d7994 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -17,23 +17,12 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.core.task.MemoryManager;
 import org.apache.inlong.agent.except.FileException;
-import org.apache.inlong.agent.message.DefaultMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.utils.AgentUtils;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -44,26 +33,11 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
 import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
 import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_RESET_TIME;
 import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SERVICE_URL;
 import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRIPTION;
@@ -72,27 +46,7 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRI
 
 public class PulsarSource extends AbstractSource {
 
-    @Data
-    @AllArgsConstructor
-    @NoArgsConstructor
-    private class SourceData {
-
-        private byte[] data;
-        private Long offset;
-    }
-
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarSource.class);
-    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
-            0, Integer.MAX_VALUE,
-            1L, TimeUnit.SECONDS,
-            new SynchronousQueue<>(),
-            new AgentThreadFactory("pulsar-source"));
-    private BlockingQueue<SourceData> queue;
-    public InstanceProfile profile;
-    private int maxPackSize;
-    private String inlongStreamId;
-    private String taskId;
-    private String instanceId;
     private String topic;
     private String serviceUrl;
     private String subscription;
@@ -100,18 +54,10 @@ public class PulsarSource extends AbstractSource {
     private String subscriptionPosition;
     private PulsarClient pulsarClient;
     private Long timestamp;
-    private volatile boolean running = false;
-    private volatile boolean runnable = true;
-    private volatile AtomicLong emptyCount = new AtomicLong(0);
-
-    private final Integer CACHE_QUEUE_SIZE = 100000;
-    private final Integer READ_WAIT_TIMEOUT_MS = 10;
-    private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
-    private final Integer BATCH_TOTAL_LEN = 1024 * 1024;
-    private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
     private final static String PULSAR_SUBSCRIPTION_PREFIX = "inlong-agent-";
-    private boolean isRealTime = false;
     private boolean isRestoreFromDB = false;
+    private Consumer<byte[]> consumer;
+    private long offset = 0L;
 
     public PulsarSource() {
     }
@@ -120,18 +66,7 @@ public class PulsarSource extends AbstractSource {
     public void init(InstanceProfile profile) {
         try {
             LOGGER.info("PulsarSource init: {}", profile.toJsonStr());
-            this.profile = profile;
             super.init(profile);
-            String cycleUnit = profile.get(TASK_CYCLE_UNIT);
-            if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
-                isRealTime = true;
-                cycleUnit = CycleUnitType.HOUR;
-            }
-            queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
-            maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
-            inlongStreamId = profile.getInlongStreamId();
-            taskId = profile.getTaskId();
-            instanceId = profile.getInstanceId();
             topic = profile.getInstanceId();
             serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL);
             subscription = profile.get(TASK_PULSAR_SUBSCRIPTION, 
PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId);
@@ -141,183 +76,101 @@ public class PulsarSource extends AbstractSource {
             timestamp = profile.getLong(TASK_PULSAR_RESET_TIME, 0);
             pulsarClient = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
             isRestoreFromDB = profile.getBoolean(RESTORE_FROM_DB, false);
-
-            EXECUTOR_SERVICE.execute(run());
+            consumer = getConsumer();
         } catch (Exception ex) {
             stopRunning();
             throw new FileException("error init stream for " + topic, ex);
         }
     }
 
-    private Runnable run() {
-        return () -> {
-            AgentThreadFactory.nameThread("pulsar-source-" + taskId + "-" + 
instanceId);
-            running = true;
-            try {
-                try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer(Schema.BYTES)
-                        .topic(topic)
-                        .subscriptionName(subscription)
-                        
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition))
-                        
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
-                        .subscribe()) {
-
-                    if (!isRestoreFromDB && timestamp != 0L) {
-                        consumer.seek(timestamp);
-                        LOGGER.info("Reset consume from {}", timestamp);
-                    } else {
-                        LOGGER.info("Skip to reset consume");
-                    }
-
-                    doRun(consumer);
-                }
-            } catch (Throwable e) {
-                LOGGER.error("do run error maybe pulsar client is configured 
incorrectly: ", e);
-            }
-            running = false;
-        };
-    }
-
-    private void doRun(Consumer<byte[]> consumer) throws PulsarClientException 
{
-        long lastPrintTime = 0;
-        while (isRunnable()) {
-            boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
-            if (!suc) {
-                break;
-            }
-            org.apache.pulsar.client.api.Message<byte[]> message = 
consumer.receive(0, TimeUnit.MILLISECONDS);
-            if (ObjectUtils.isEmpty(message)) {
-                if (queue.isEmpty()) {
-                    emptyCount.incrementAndGet();
-                } else {
-                    emptyCount.set(0);
-                }
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
-                AgentUtils.silenceSleepInSeconds(1);
-                continue;
-            }
-            emptyCount.set(0);
-            long offset = 0L;
-            SourceData sourceData = new SourceData(message.getValue(), 0L);
-            boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, message.getValue().length);
-            if (!suc4Queue) {
-                break;
-            }
-            putIntoQueue(sourceData);
-            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
-            consumer.acknowledge(message);
-
-            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
-                lastPrintTime = AgentUtils.getCurrentTime();
-                LOGGER.info("pulsar topic is {}, offset is {}", topic, offset);
-            }
-        }
+    @Override
+    protected String getThreadName() {
+        return "pulsar-source-" + taskId + "-" + instanceId;
     }
 
-    public boolean isRunnable() {
-        return runnable;
+    @Override
+    protected boolean doPrepareToRead() {
+        return true;
     }
 
-    private boolean waitForPermit(String permitName, int permitLen) {
-        boolean suc = false;
-        while (!suc) {
-            suc = MemoryManager.getInstance().tryAcquire(permitName, 
permitLen);
-            if (!suc) {
-                MemoryManager.getInstance().printDetail(permitName, "log file 
source");
-                if (!isRunnable()) {
-                    return false;
-                }
-                AgentUtils.silenceSleepInSeconds(1);
-            }
+    @Override
+    protected List<SourceData> readFromSource() {
+        List<SourceData> dataList = new ArrayList<>();
+        org.apache.pulsar.client.api.Message<byte[]> message = null;
+        try {
+            message = consumer.receive(0, TimeUnit.MILLISECONDS);
+            offset = message.getSequenceId();
+        } catch (PulsarClientException e) {
+            LOGGER.error("read from pulsar error", e);
         }
-        return true;
+        if (!ObjectUtils.isEmpty(message)) {
+            dataList.add(new SourceData(message.getValue(), 0L));
+        }
+        try {
+            consumer.acknowledge(message);
+        } catch (PulsarClientException e) {
+            LOGGER.error("ack pulsar error", e);
+        }
+        return dataList;
     }
 
-    private void putIntoQueue(SourceData sourceData) {
-        if (sourceData == null) {
-            return;
-        }
+    private Consumer<byte[]> getConsumer() {
+        Consumer<byte[]> consumer = null;
         try {
-            boolean offerSuc = false;
-            if (queue.remainingCapacity() > 0) {
-                while (isRunnable() && !offerSuc) {
-                    offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
-                }
+            consumer = pulsarClient.newConsumer(Schema.BYTES)
+                    .topic(topic)
+                    .subscriptionName(subscription)
+                    
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition))
+                    
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
+                    .subscribe();
+            if (!isRestoreFromDB && timestamp != 0L) {
+                consumer.seek(timestamp);
+                LOGGER.info("Reset consume from {}", timestamp);
+            } else {
+                LOGGER.info("Skip to reset consume");
             }
-
-            if (!offerSuc) {
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
+            return consumer;
+        } catch (PulsarClientException | IllegalArgumentException e) {
+            if (consumer == null) {
+                try {
+                    consumer.close();
+                } catch (PulsarClientException ex) {
+                    LOGGER.error("close consumer error", e);
+                }
             }
-            LOGGER.debug("Read {} from pulsar topic {}", sourceData.getData(), 
topic);
-        } catch (InterruptedException e) {
-            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
-            LOGGER.error("fetchData offer failed {}", e.getMessage());
+            LOGGER.error("get consumer error", e);
         }
+        return null;
     }
 
     @Override
-    public List<Reader> split(TaskProfile conf) {
-        return null;
+    protected void printCurrentState() {
+        LOGGER.info("pulsar topic is {}, offset is {}", topic, offset);
     }
 
     @Override
-    public Message read() {
-        SourceData sourceData = null;
-        try {
-            sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            LOGGER.warn("poll {} data get interrupted.", topic, e);
-        }
-        if (sourceData == null) {
-            return null;
-        }
-        MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
-        Message finalMsg = createMessage(sourceData);
-        return finalMsg;
+    protected boolean isRunnable() {
+        return runnable;
     }
 
-    private Message createMessage(SourceData sourceData) {
-        String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
-        Map<String, String> header = new HashMap<>();
-        header.put(PROXY_KEY_DATA, proxyPartitionKey);
-        header.put(OFFSET, sourceData.offset.toString());
-        header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
-
-        long auditTime;
-        if (isRealTime) {
-            auditTime = AgentUtils.getCurrentTime();
-        } else {
-            auditTime = profile.getSinkDataTime();
-        }
-        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
-                auditTime, 1, sourceData.data.length);
-        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, 
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
-                AgentUtils.getCurrentTime(), 1, sourceData.data.length);
-        Message finalMsg = new DefaultMessage(sourceData.data, header);
-        if (finalMsg.getBody().length > maxPackSize) {
-            LOGGER.warn("message size is {}, greater than max pack size {}, 
drop it!",
-                    finalMsg.getBody().length, maxPackSize);
-            return null;
+    @Override
+    protected void releaseSource() {
+        if (consumer != null) {
+            try {
+                consumer.close();
+            } catch (PulsarClientException e) {
+                LOGGER.error("close consumer error", e);
+            }
         }
-        return finalMsg;
     }
 
     @Override
-    public boolean sourceFinish() {
-        if (isRealTime) {
-            return false;
-        }
-        return emptyCount.get() > EMPTY_CHECK_COUNT_AT_LEAST;
+    public List<Reader> split(TaskProfile conf) {
+        return null;
     }
 
     @Override
     public boolean sourceExist() {
         return true;
     }
-
-    /**
-     * Stop running threads.
-     */
-    public void stopRunning() {
-        runnable = false;
-    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
index 9f3671355e..a844ba38cb 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
@@ -49,11 +49,41 @@ public class RedisSource extends AbstractSource {
         return readerList;
     }
 
+    @Override
+    protected String getThreadName() {
+        return null;
+    }
+
+    @Override
+    protected void printCurrentState() {
+
+    }
+
+    @Override
+    protected boolean doPrepareToRead() {
+        return false;
+    }
+
+    @Override
+    protected List<SourceData> readFromSource() {
+        return null;
+    }
+
     @Override
     public Message read() {
         return null;
     }
 
+    @Override
+    protected boolean isRunnable() {
+        return runnable;
+    }
+
+    @Override
+    protected void releaseSource() {
+
+    }
+
     @Override
     public boolean sourceFinish() {
         return false;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
index f199f983eb..8fceb8f635 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
@@ -48,11 +48,41 @@ public class SQLServerSource extends AbstractSource {
         return readerList;
     }
 
+    @Override
+    protected String getThreadName() {
+        return null;
+    }
+
+    @Override
+    protected void printCurrentState() {
+
+    }
+
+    @Override
+    protected boolean doPrepareToRead() {
+        return false;
+    }
+
+    @Override
+    protected List<SourceData> readFromSource() {
+        return null;
+    }
+
     @Override
     public Message read() {
         return null;
     }
 
+    @Override
+    protected boolean isRunnable() {
+        return runnable;
+    }
+
+    @Override
+    protected void releaseSource() {
+
+    }
+
     @Override
     public boolean sourceFinish() {
         return false;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 5dc79377de..d9e8b9834a 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -17,22 +17,77 @@
 
 package org.apache.inlong.agent.plugin.sources.file;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Source;
+import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler;
+import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.metric.MetricRegister;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static 
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
+import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
 
 public abstract class AbstractSource implements Source {
 
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    protected class SourceData {
+
+        private byte[] data;
+        private Long offset;
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSource.class);
+
+    protected final Integer BATCH_READ_LINE_COUNT = 10000;
+    protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
+    protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
+    protected final Integer READ_WAIT_TIMEOUT_MS = 10;
+    private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
+    private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
+    protected BlockingQueue<SourceData> queue;
+
     protected String inlongGroupId;
     protected String inlongStreamId;
     // metric
@@ -41,11 +96,169 @@ public abstract class AbstractSource implements Source {
     protected String metricName;
     protected Map<String, String> dimensions;
     protected static final AtomicLong METRIX_INDEX = new AtomicLong(0);
+    protected volatile boolean runnable = true;
+    protected volatile boolean running = false;
+    protected String taskId;
+    protected String instanceId;
+    protected InstanceProfile profile;
+    private ExtendedHandler extendedHandler;
+    private boolean isRealTime = false;
+    protected volatile long emptyCount = 0;
+    protected int maxPackSize;
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("source-pool"));
+    protected OffsetProfile offsetProfile;
 
     @Override
     public void init(InstanceProfile profile) {
+        this.profile = profile;
+        taskId = profile.getTaskId();
+        instanceId = profile.getInstanceId();
         inlongGroupId = profile.getInlongGroupId();
         inlongStreamId = profile.getInlongStreamId();
+        maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+        queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+        String cycleUnit = profile.get(TASK_CYCLE_UNIT);
+        if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+            isRealTime = true;
+        }
+        initOffset();
+        registerMetric();
+        initExtendHandler();
+    }
+
+    protected void initOffset() {
+        offsetProfile = OffsetManager.getInstance().getOffset(taskId, 
instanceId);
+    }
+
+    @Override
+    public void start() {
+        EXECUTOR_SERVICE.execute(run());
+    }
+
+    private Runnable run() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName());
+            running = true;
+            try {
+                doRun();
+            } catch (Throwable e) {
+                LOGGER.error("do run error maybe file deleted: ", e);
+            }
+            running = false;
+        };
+    }
+
+    private void doRun() {
+        long lastPrintTime = 0;
+        while (isRunnable()) {
+            if (!prepareToRead()) {
+                break;
+            }
+            List<SourceData> lines = readFromSource();
+            if (lines != null && lines.isEmpty()) {
+                if (queue.isEmpty()) {
+                    emptyCount++;
+                } else {
+                    emptyCount = 0;
+                }
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
+                AgentUtils.silenceSleepInSeconds(1);
+                continue;
+            }
+            emptyCount = 0;
+            for (int i = 0; i < lines.size(); i++) {
+                boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
+                if (!suc4Queue) {
+                    break;
+                }
+                putIntoQueue(lines.get(i));
+            }
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
+            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
+                lastPrintTime = AgentUtils.getCurrentTime();
+                printCurrentState();
+            }
+        }
+    }
+
+    protected abstract void printCurrentState();
+
+    /**
+     * Before reading the data source, some preparation operations need to be 
done, such as memory control semaphore
+     * application and data source legitimacy verification
+     *
+     * @return true if prepared ok
+     */
+    private boolean prepareToRead() {
+        if (!doPrepareToRead()) {
+            return false;
+        }
+        return waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
+    }
+
+    /**
+     * Except for applying for memory control semaphores, all other 
preparatory work is implemented by this function
+     *
+     * @return true if prepared ok
+     */
+    protected abstract boolean doPrepareToRead();
+
+    /**
+     * After preparation work, we started to truly read data from the data 
source
+     *
+     * @return source data list
+     */
+    protected abstract List<SourceData> readFromSource();
+
+    private boolean waitForPermit(String permitName, int permitLen) {
+        boolean suc = false;
+        while (!suc) {
+            suc = MemoryManager.getInstance().tryAcquire(permitName, 
permitLen);
+            if (!suc) {
+                MemoryManager.getInstance().printDetail(permitName, "source");
+                if (!isRunnable()) {
+                    return false;
+                }
+                AgentUtils.silenceSleepInSeconds(1);
+            }
+        }
+        return true;
+    }
+
+    /**
+     * After preparation work, we started to truly read data from the data 
source
+     */
+    private void putIntoQueue(SourceData sourceData) {
+        if (sourceData == null) {
+            return;
+        }
+        try {
+            boolean offerSuc = false;
+            while (isRunnable() && !offerSuc) {
+                offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
+            }
+            if (!offerSuc) {
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.getData().length);
+            }
+            LOGGER.debug("Read {} from source {}", sourceData.getData(), 
inlongGroupId);
+        } catch (InterruptedException e) {
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.getData().length);
+            LOGGER.error("fetchData offer failed", e);
+        }
+    }
+
+    /**
+     * Returns the name of the thread, used to identify different data sources
+     *
+     * @return source data list
+     */
+    protected abstract String getThreadName();
+
+    private void registerMetric() {
         // register metric
         this.dimensions = new HashMap<>();
         dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
@@ -58,7 +271,129 @@ public abstract class AbstractSource implements Source {
         sourceMetric = metricItemSet.findMetricItem(dimensions);
     }
 
+    private void initExtendHandler() {
+        if 
(DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName())
 != 0) {
+            Constructor<?> constructor =
+                    null;
+            try {
+                constructor = Class.forName(
+                        profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS, 
DEFAULT_FILE_SOURCE_EXTEND_CLASS))
+                        .getDeclaredConstructor(InstanceProfile.class);
+            } catch (NoSuchMethodException e) {
+                LOGGER.error("init {} NoSuchMethodException error", 
instanceId, e);
+            } catch (ClassNotFoundException e) {
+                LOGGER.error("init {} ClassNotFoundException error", 
instanceId, e);
+            }
+            constructor.setAccessible(true);
+            try {
+                extendedHandler = (ExtendedHandler) 
constructor.newInstance(profile);
+            } catch (InstantiationException e) {
+                LOGGER.error("init {} InstantiationException error", 
instanceId, e);
+            } catch (IllegalAccessException e) {
+                LOGGER.error("init {} IllegalAccessException error", 
instanceId, e);
+            } catch (InvocationTargetException e) {
+                LOGGER.error("init {} InvocationTargetException error", 
instanceId, e);
+            }
+        }
+    }
+
+    @Override
+    public Message read() {
+        SourceData sourceData = null;
+        try {
+            sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.warn("poll {} data get interrupted.", instanceId);
+        }
+        if (sourceData == null) {
+            return null;
+        }
+        MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.getData().length);
+        return createMessage(sourceData);
+    }
+
+    private Message createMessage(SourceData sourceData) {
+        String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
+        Map<String, String> header = new HashMap<>();
+        header.put(PROXY_KEY_DATA, proxyPartitionKey);
+        header.put(OFFSET, sourceData.getOffset().toString());
+        header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
+        if (extendedHandler != null) {
+            extendedHandler.dealWithHeader(header, sourceData.getData());
+        }
+        long auditTime = 0;
+        if (isRealTime) {
+            auditTime = AgentUtils.getCurrentTime();
+        } else {
+            auditTime = profile.getSinkDataTime();
+        }
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
+                auditTime, 1, sourceData.getData().length);
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, 
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
+                AgentUtils.getCurrentTime(), 1, sourceData.getData().length);
+        Message finalMsg = new DefaultMessage(sourceData.getData(), header);
+        // if the message size is greater than max pack size,should drop it.
+        if (finalMsg.getBody().length > maxPackSize) {
+            LOGGER.warn("message size is {}, greater than max pack size {}, 
drop it!",
+                    finalMsg.getBody().length, maxPackSize);
+            return null;
+        }
+        return finalMsg;
+    }
+
+    /**
+     * Whether threads can in running state with while loop.
+     *
+     * @return true if threads can run
+     */
+    protected abstract boolean isRunnable();
+
+    /**
+     * Stop running threads.
+     */
+    public void stopRunning() {
+        runnable = false;
+    }
+
     public void destroy() {
+        LOGGER.info("destroy read source name {}", instanceId);
+        stopRunning();
+        while (running) {
+            AgentUtils.silenceSleepInMs(1);
+        }
+        clearQueue(queue);
+        releaseSource();
+        LOGGER.info("destroy read source name {} end", instanceId);
+    }
+
+    /**
+     * Release the source resource if needed
+     */
+    protected abstract void releaseSource();
 
+    private void clearQueue(BlockingQueue<SourceData> queue) {
+        if (queue == null) {
+            return;
+        }
+        while (queue != null && !queue.isEmpty()) {
+            SourceData sourceData = null;
+            try {
+                sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOGGER.warn("poll {} data get interrupted.", instanceId, e);
+            }
+            if (sourceData != null) {
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.getData().length);
+            }
+        }
+        queue.clear();
+    }
+
+    @Override
+    public boolean sourceFinish() {
+        if (isRealTime) {
+            return false;
+        }
+        return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesMetadataProvider.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesMetadataProvider.java
deleted file mode 100644
index 63b0ae32ba..0000000000
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesMetadataProvider.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sources.reader.file;
-
-import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.sources.LogFileSource;
-import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
-import org.apache.inlong.agent.plugin.utils.PluginUtils;
-
-import com.google.gson.Gson;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodList;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.dsl.MixedOperation;
-import io.fabric8.kubernetes.client.dsl.PodResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Objects;
-
-import static 
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
-import static 
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
-import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
-import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
-import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_CONTAINER_ID;
-import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_CONTAINER_NAME;
-import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_NAMESPACE;
-import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_POD_LABEL;
-import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_POD_NAME;
-import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_POD_UID;
-
-/**
- * k8s file reader
- */
-public final class KubernetesMetadataProvider {
-
-    private static final Logger log = 
LoggerFactory.getLogger(KubernetesMetadataProvider.class);
-    private static final Gson GSON = new Gson();
-
-    private KubernetesClient client;
-    private LogFileSource fileReaderOperator;
-
-    public KubernetesMetadataProvider(LogFileSource fileReaderOperator) {
-        this.fileReaderOperator = fileReaderOperator;
-    }
-
-    public void getData() {
-        if (Objects.nonNull(client) && 
Objects.nonNull(fileReaderOperator.metadata)) {
-            return;
-        }
-        try {
-            client = PluginUtils.getKubernetesClient();
-        } catch (IOException e) {
-            log.error("get k8s client error: ", e);
-        }
-        getK8sMetadata(fileReaderOperator.profile);
-    }
-
-    /**
-     * get PODS of kubernetes information
-     */
-    public PodList getPods() {
-        if (Objects.isNull(client)) {
-            return null;
-        }
-        MixedOperation<Pod, PodList, PodResource> pods = client.pods();
-        return pods.list();
-    }
-
-    /**
-     * get pod metadata by namespace and pod name
-     */
-    public void getK8sMetadata(InstanceProfile jobConf) {
-        if (Objects.isNull(jobConf)) {
-            return;
-        }
-        Map<String, String> k8sInfo = 
MetaDataUtils.getLogInfo(fileReaderOperator.getFile().getName());
-        log.info("file name is: {}, k8s information size: {}", 
fileReaderOperator.getFile().getName(), k8sInfo.size());
-        if (k8sInfo.isEmpty()) {
-            return;
-        }
-
-        Map<String, String> metadata = fileReaderOperator.metadata;
-        metadata.put(METADATA_NAMESPACE, k8sInfo.get(NAMESPACE));
-        metadata.put(METADATA_CONTAINER_NAME, k8sInfo.get(CONTAINER_NAME));
-        metadata.put(METADATA_CONTAINER_ID, k8sInfo.get(CONTAINER_ID));
-        metadata.put(METADATA_POD_NAME, k8sInfo.get(POD_NAME));
-
-        PodResource podResource = 
client.pods().inNamespace(k8sInfo.get(NAMESPACE))
-                .withName(k8sInfo.get(POD_NAME));
-        if (Objects.isNull(podResource)) {
-            return;
-        }
-        Pod pod = podResource.get();
-        PodList podList = 
client.pods().inNamespace(k8sInfo.get(NAMESPACE)).list();
-        podList.getItems().forEach(data -> {
-            if (data.equals(pod)) {
-                metadata.put(METADATA_POD_UID, pod.getMetadata().getUid());
-                metadata.put(METADATA_POD_LABEL, 
GSON.toJson(pod.getMetadata().getLabels()));
-            }
-        });
-        return;
-    }
-}
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index fb404dea0a..a9d683750c 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -94,6 +94,7 @@ public class TestLogFileSource {
                 OffsetManager.getInstance().setOffset(offsetProfile);
             }
             source.init(instanceProfile);
+            source.start();
             return source;
         } catch (Exception e) {
             LOGGER.error("source init error {}", e);


Reply via email to