This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6b5ab82ce0 [INLONG-9982][Agent] Adjusting the abstraction of source
code to facilitate rapid addition of sources (#9984)
6b5ab82ce0 is described below
commit 6b5ab82ce0da2023c545c6a4d89b7febb3b92b3a
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Apr 16 18:35:30 2024 +0800
[INLONG-9982][Agent] Adjusting the abstraction of source code to facilitate
rapid addition of sources (#9984)
* [INLONG-9982][Agent] Adjusting the abstraction of source code to
facilitate rapid addition of sources
* Update
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
Co-authored-by: AloysZhang <[email protected]>
* [INLONG-9982][Agent] Modify based on comments
* [INLONG-9982][Agent] Modify based on comments
* [INLONG-9982][Agent] Modify based on comments
---------
Co-authored-by: AloysZhang <[email protected]>
---
.../apache/inlong/agent/plugin/file/Source.java | 6 +
.../agent/plugin/sources/DatabaseSqlSource.java | 30 ++
.../inlong/agent/plugin/sources/KafkaSource.java | 287 +++-----------
.../inlong/agent/plugin/sources/LogFileSource.java | 430 ++++-----------------
.../inlong/agent/plugin/sources/MongoDBSource.java | 30 ++
.../inlong/agent/plugin/sources/MqttSource.java | 30 ++
.../inlong/agent/plugin/sources/OracleSource.java | 30 ++
.../agent/plugin/sources/PostgreSQLSource.java | 30 ++
.../inlong/agent/plugin/sources/PulsarSource.java | 277 ++++---------
.../inlong/agent/plugin/sources/RedisSource.java | 30 ++
.../agent/plugin/sources/SQLServerSource.java | 30 ++
.../agent/plugin/sources/file/AbstractSource.java | 335 ++++++++++++++++
.../reader/file/KubernetesMetadataProvider.java | 121 ------
.../agent/plugin/sources/TestLogFileSource.java | 1 +
14 files changed, 751 insertions(+), 916 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java
index c65f305227..26a5ec733d 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java
@@ -50,6 +50,12 @@ public interface Source {
*/
void init(InstanceProfile profile);
+ /**
+ * Executed after init, usually used to start the source's worker thread
+ *
+ */
+ void start();
+
/**
* destroy
*/
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
index 227dd375ef..2df77b4f83 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
@@ -88,11 +88,41 @@ public class DatabaseSqlSource extends AbstractSource {
return readerList;
}
+ @Override
+ protected String getThreadName() {
+ return null;
+ }
+
+ @Override
+ protected void printCurrentState() {
+
+ }
+
+ @Override
+ protected boolean doPrepareToRead() {
+ return false;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ return null;
+ }
+
@Override
public Message read() {
return null;
}
+ @Override
+ protected boolean isRunnable() {
+ return runnable;
+ }
+
+ @Override
+ protected void releaseSource() {
+
+ }
+
@Override
public boolean sourceFinish() {
return false;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index fe31730fdc..42e6d7c70f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -17,23 +17,12 @@
package org.apache.inlong.agent.plugin.sources;
-import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.core.task.MemoryManager;
import org.apache.inlong.agent.except.FileException;
-import org.apache.inlong.agent.message.DefaultMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.utils.AgentUtils;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -51,23 +40,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_BOOTSTRAP_SERVERS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_OFFSET;
@@ -79,45 +53,17 @@ import static
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_PARTITIO
*/
public class KafkaSource extends AbstractSource {
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- private class SourceData {
-
- private byte[] data;
- private Long offset;
- }
-
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaSource.class);
- private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
- 0, Integer.MAX_VALUE,
- 1L, TimeUnit.SECONDS,
- new SynchronousQueue<>(),
- new AgentThreadFactory("kafka-source"));
- private BlockingQueue<SourceData> queue;
- public InstanceProfile profile;
- private int maxPackSize;
- private String taskId;
- private String instanceId;
private String topic;
private Properties props = new Properties();
private String allPartitionOffsets;
Map<Integer, Long> partitionOffsets = new HashMap<>();
- private volatile boolean running = false;
- private volatile boolean runnable = true;
- private volatile AtomicLong emptyCount = new AtomicLong(0);
-
- private final Integer CACHE_QUEUE_SIZE = 100000;
- private final Integer READ_WAIT_TIMEOUT_MS = 10;
- private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
- private final Integer BATCH_TOTAL_LEN = 1024 * 1024;
-
private static final String KAFKA_DESERIALIZER_METHOD =
"org.apache.kafka.common.serialization.ByteArrayDeserializer";
private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms";
- private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
- private boolean isRealTime = false;
private boolean isRestoreFromDB = false;
+ private KafkaConsumer<String, byte[]> kafkaConsumer;
+ private long offset = 0L;
public KafkaSource() {
}
@@ -128,15 +74,6 @@ public class KafkaSource extends AbstractSource {
LOGGER.info("KafkaSource init: {}", profile.toJsonStr());
this.profile = profile;
super.init(profile);
- String cycleUnit = profile.get(TASK_CYCLE_UNIT);
- if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
- isRealTime = true;
- cycleUnit = CycleUnitType.HOUR;
- }
- queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
- maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
- taskId = profile.getTaskId();
- instanceId = profile.getInstanceId();
topic = profile.getInstanceId();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
profile.get(TASK_KAFKA_BOOTSTRAP_SERVERS));
props.put(ConsumerConfig.GROUP_ID_CONFIG, taskId);
@@ -155,141 +92,82 @@ public class KafkaSource extends AbstractSource {
Long.valueOf(offset.split(TASK_KAFKA_PARTITION_OFFSET_DELIMITER)[1]));
}
}
-
- EXECUTOR_SERVICE.execute(run());
+ kafkaConsumer = getKafkaConsumer();
} catch (Exception ex) {
stopRunning();
throw new FileException("error init stream for " + topic, ex);
}
}
- private Runnable run() {
- return () -> {
- AgentThreadFactory.nameThread("kafka-source-" + taskId + "-" +
instanceId);
- running = true;
- try {
- List<PartitionInfo> partitionInfoList;
- try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props)) {
- partitionInfoList = consumer.partitionsFor(topic);
- }
-
- props.put(KAFKA_SESSION_TIMEOUT, 30000);
-
- try (KafkaConsumer<String, byte[]> kafkaConsumer = new
KafkaConsumer<>(props)) {
- if (null != partitionInfoList) {
- List<TopicPartition> topicPartitions = new
ArrayList<>();
- for (PartitionInfo partitionInfo : partitionInfoList) {
- TopicPartition topicPartition = new
TopicPartition(partitionInfo.topic(),
- partitionInfo.partition());
- topicPartitions.add(topicPartition);
- }
- kafkaConsumer.assign(topicPartitions);
-
- if (!isRestoreFromDB &&
StringUtils.isNotBlank(allPartitionOffsets)) {
- for (TopicPartition topicPartition :
topicPartitions) {
- Long offset =
partitionOffsets.get(topicPartition.partition());
- if (ObjectUtils.isNotEmpty(offset)) {
- kafkaConsumer.seek(topicPartition, offset);
- }
- }
- } else {
- LOGGER.info("Skip to seek offset");
- }
- }
- doRun(kafkaConsumer);
- }
- } catch (Throwable e) {
- LOGGER.error("do run error maybe topic is configured
incorrectly: ", e);
- }
- running = false;
- };
+ @Override
+ protected String getThreadName() {
+ return "kafka-source-" + taskId + "-" + instanceId;
}
- private void doRun(KafkaConsumer<String, byte[]> kafkaConsumer) {
- long lastPrintTime = 0;
- while (isRunnable()) {
- boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
- if (!suc) {
- break;
- }
- ConsumerRecords<String, byte[]> records =
kafkaConsumer.poll(Duration.ofMillis(1000));
- if (records.isEmpty()) {
- if (queue.isEmpty()) {
- emptyCount.incrementAndGet();
- } else {
- emptyCount.set(0);
- }
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
- AgentUtils.silenceSleepInSeconds(1);
- continue;
- }
- emptyCount.set(0);
- long offset = 0L;
- for (ConsumerRecord<String, byte[]> record : records) {
- SourceData sourceData = new SourceData(record.value(),
record.offset());
- boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, record.value().length);
- if (!suc4Queue) {
- break;
- }
- putIntoQueue(sourceData);
- offset = record.offset();
- }
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
- kafkaConsumer.commitSync();
-
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
- lastPrintTime = AgentUtils.getCurrentTime();
- LOGGER.info("kafka topic is {}, offset is {}", topic, offset);
- }
- }
+ @Override
+ protected boolean doPrepareToRead() {
+ return true;
}
- private boolean waitForPermit(String permitName, int permitLen) {
- boolean suc = false;
- while (!suc) {
- suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
- if (!suc) {
- MemoryManager.getInstance().printDetail(permitName, "log file
source");
- if (!isRunnable()) {
- return false;
- }
- AgentUtils.silenceSleepInSeconds(1);
- }
+ @Override
+ protected List<SourceData> readFromSource() {
+ List<SourceData> dataList = new ArrayList<>();
+ ConsumerRecords<String, byte[]> records =
kafkaConsumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, byte[]> record : records) {
+ SourceData sourceData = new SourceData(record.value(),
record.offset());
+ dataList.add(sourceData);
+ offset = record.offset();
}
- return true;
+ kafkaConsumer.commitSync();
+ return dataList;
}
- private void putIntoQueue(SourceData sourceData) {
- if (sourceData == null) {
- return;
- }
+ private KafkaConsumer<String, byte[]> getKafkaConsumer() {
+ List<PartitionInfo> partitionInfoList;
+ KafkaConsumer<String, byte[]> kafkaConsumer = null;
+ props.put(KAFKA_SESSION_TIMEOUT, 30000);
try {
- boolean offerSuc = false;
- if (queue.remainingCapacity() > 0) {
- while (isRunnable() && !offerSuc) {
- offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
+ kafkaConsumer = new KafkaConsumer<>(props);
+ partitionInfoList = kafkaConsumer.partitionsFor(topic);
+ if (partitionInfoList == null) {
+ kafkaConsumer.close();
+ return null;
+ }
+ List<TopicPartition> topicPartitions = new ArrayList<>();
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ TopicPartition topicPartition = new
TopicPartition(partitionInfo.topic(),
+ partitionInfo.partition());
+ topicPartitions.add(topicPartition);
+ }
+ kafkaConsumer.assign(topicPartitions);
+ if (!isRestoreFromDB &&
StringUtils.isNotBlank(allPartitionOffsets)) {
+ for (TopicPartition topicPartition : topicPartitions) {
+ Long offset =
partitionOffsets.get(topicPartition.partition());
+ if (ObjectUtils.isNotEmpty(offset)) {
+ kafkaConsumer.seek(topicPartition, offset);
+ }
}
+ } else {
+ LOGGER.info("Skip to seek offset");
}
-
- if (!offerSuc) {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
+ return kafkaConsumer;
+ } catch (Exception e) {
+ if (kafkaConsumer != null) {
+ kafkaConsumer.close();
}
- LOGGER.debug("Read {} from kafka topic {}", sourceData.getData(),
topic);
- } catch (InterruptedException e) {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
- LOGGER.error("fetchData offer failed {}", e.getMessage());
+ LOGGER.error("get kafka consumer error", e);
}
+ return null;
}
- public boolean isRunnable() {
- return runnable;
+ @Override
+ protected void printCurrentState() {
+ LOGGER.info("kafka topic is {}, offset is {}", topic, offset);
}
- /**
- * Stop running threads.
- */
- public void stopRunning() {
- runnable = false;
+ @Override
+ protected boolean isRunnable() {
+ return runnable;
}
@Override
@@ -298,57 +176,14 @@ public class KafkaSource extends AbstractSource {
}
@Override
- public Message read() {
- SourceData sourceData = null;
- try {
- sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOGGER.warn("poll {} data get interrupted.", topic, e);
- }
- if (sourceData == null) {
- return null;
- }
- MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
- Message finalMsg = createMessage(sourceData);
- return finalMsg;
- }
-
- private Message createMessage(SourceData sourceData) {
- String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
- Map<String, String> header = new HashMap<>();
- header.put(PROXY_KEY_DATA, proxyPartitionKey);
- header.put(OFFSET, sourceData.offset.toString());
- header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
-
- long auditTime = 0;
- if (isRealTime) {
- auditTime = AgentUtils.getCurrentTime();
- } else {
- auditTime = profile.getSinkDataTime();
- }
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
- auditTime, 1, sourceData.data.length);
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME,
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
- AgentUtils.getCurrentTime(), 1, sourceData.data.length);
- Message finalMsg = new DefaultMessage(sourceData.data, header);
- if (finalMsg.getBody().length > maxPackSize) {
- LOGGER.warn("message size is {}, greater than max pack size {},
drop it!",
- finalMsg.getBody().length, maxPackSize);
- return null;
- }
- return finalMsg;
+ public boolean sourceExist() {
+ return true;
}
@Override
- public boolean sourceFinish() {
- if (isRealTime) {
- return false;
+ protected void releaseSource() {
+ if (kafkaConsumer != null) {
+ kafkaConsumer.close();
}
- return emptyCount.get() > EMPTY_CHECK_COUNT_AT_LEAST;
- }
-
- @Override
- public boolean sourceExist() {
- return true;
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 9f83f6fc06..037470fb41 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -17,32 +17,17 @@
package org.apache.inlong.agent.plugin.sources;
-import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.constant.DataCollectType;
import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.task.MemoryManager;
-import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.except.FileException;
-import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler;
-import
org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DateTransUtils;
-import com.google.gson.Gson;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,93 +38,28 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.RandomAccessFile;
-import java.lang.reflect.Constructor;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES;
-import static org.apache.inlong.agent.constant.MetadataConstants.DATA_CONTENT;
-import static
org.apache.inlong.agent.constant.MetadataConstants.DATA_CONTENT_TIME;
-import static org.apache.inlong.agent.constant.MetadataConstants.ENV_CVM;
-import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_FILE_NAME;
-import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_HOST_NAME;
-import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_SOURCE_IP;
-import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
-import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_META_ENV_LIST;
/**
* Read text files
*/
public class LogFileSource extends AbstractSource {
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- private class SourceData {
-
- private String data;
- private Long offset;
- }
-
private static final Logger LOGGER =
LoggerFactory.getLogger(LogFileSource.class);
- private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
- 0, Integer.MAX_VALUE,
- 1L, TimeUnit.SECONDS,
- new SynchronousQueue<>(),
- new AgentThreadFactory("log-file-source"));
- private final Integer BATCH_READ_LINE_COUNT = 10000;
- private final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
- private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
- private final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
- private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
- private final Integer READ_WAIT_TIMEOUT_MS = 10;
- private final SimpleDateFormat RECORD_TIME_FORMAT = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- public InstanceProfile profile;
- private String taskId;
- private String instanceId;
- private int maxPackSize;
+
private String fileName;
private File file;
private byte[] bufferToReadFile;
public volatile long linePosition = 0;
public volatile long bytePosition = 0;
- private boolean needMetadata = false;
- public Map<String, String> metadata;
private boolean isIncrement = false;
- private BlockingQueue<SourceData> queue;
- private final Gson GSON = new Gson();
- private volatile boolean runnable = true;
private volatile boolean fileExist = true;
private String inodeInfo;
private volatile long lastInodeUpdateTime = 0;
- private volatile boolean running = false;
- private long dataTime = 0;
- private volatile long emptyCount = 0;
- private ExtendedHandler extendedHandler;
- private boolean isRealTime = false;
+ private RandomAccessFile randomAccessFile;
public LogFileSource() {
}
@@ -148,17 +68,8 @@ public class LogFileSource extends AbstractSource {
public void init(InstanceProfile profile) {
try {
LOGGER.info("LogFileSource init: {}", profile.toJsonStr());
- this.profile = profile;
super.init(profile);
- String cycleUnit = profile.get(TASK_CYCLE_UNIT);
- if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
- isRealTime = true;
- cycleUnit = CycleUnitType.HOUR;
- }
- taskId = profile.getTaskId();
- instanceId = profile.getInstanceId();
fileName = profile.getInstanceId();
- maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
isIncrement = isIncrement(profile);
file = new File(fileName);
@@ -166,28 +77,65 @@ public class LogFileSource extends AbstractSource {
lastInodeUpdateTime = AgentUtils.getCurrentTime();
linePosition = getInitLineOffset(isIncrement, taskId, instanceId,
inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
- queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
- dataTime =
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), cycleUnit);
- if
(DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName())
!= 0) {
- Constructor<?> constructor =
- Class.forName(
-
profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS,
DEFAULT_FILE_SOURCE_EXTEND_CLASS))
- .getDeclaredConstructor(InstanceProfile.class);
- constructor.setAccessible(true);
- extendedHandler = (ExtendedHandler)
constructor.newInstance(profile);
- }
- try {
- registerMeta(profile);
- } catch (Exception ex) {
- LOGGER.error("init metadata error", ex);
- }
- EXECUTOR_SERVICE.execute(run());
+ randomAccessFile = new RandomAccessFile(file, "r");
} catch (Exception ex) {
stopRunning();
throw new FileException("error init stream for " + file.getPath(),
ex);
}
}
+ @Override
+ protected boolean doPrepareToRead() {
+ if (isInodeChanged()) {
+ fileExist = false;
+ LOGGER.info("inode changed, instance will restart and offset will
be clean, file {}",
+ fileName);
+ return false;
+ }
+ if (file.length() < bytePosition) {
+ fileExist = false;
+ LOGGER.info("file rotate, instance will restart and offset will be
clean, file {}",
+ fileName);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ try {
+ return readFromPos(bytePosition);
+ } catch (FileNotFoundException e) {
+ fileExist = false;
+ LOGGER.error("readFromPos file deleted error: ", e);
+ } catch (IOException e) {
+ LOGGER.error("readFromPos error: ", e);
+ }
+ return null;
+ }
+
+ @Override
+ protected void printCurrentState() {
+ LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len
{}", file.getName(), linePosition,
+ bytePosition, file.length());
+ }
+
+ @Override
+ protected String getThreadName() {
+ return "log-file-source-" + taskId + "-" + fileName;
+ }
+
+ private List<SourceData> readFromPos(long pos) throws IOException {
+ List<byte[]> lines = new ArrayList<>();
+ List<SourceData> dataList = new ArrayList<>();
+ bytePosition = readLines(randomAccessFile, pos, lines,
BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false);
+ for (int i = 0; i < lines.size(); i++) {
+ linePosition++;
+ dataList.add(new SourceData(lines.get(i), linePosition));
+ }
+ return dataList;
+ }
+
private int getRealLineCount(String fileName) {
try (LineNumberReader lineNumberReader = new LineNumberReader(new
FileReader(instanceId))) {
lineNumberReader.skip(Long.MAX_VALUE);
@@ -199,7 +147,6 @@ public class LogFileSource extends AbstractSource {
}
private long getInitLineOffset(boolean isIncrement, String taskId, String
instanceId, String inodeInfo) {
- OffsetProfile offsetProfile =
OffsetManager.getInstance().getOffset(taskId, instanceId);
long offset = 0;
if (offsetProfile != null &&
offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
offset = offsetProfile.getOffset();
@@ -229,24 +176,6 @@ public class LogFileSource extends AbstractSource {
return file;
}
- public void registerMeta(InstanceProfile jobConf) {
- if (!jobConf.hasKey(TASK_FILE_META_ENV_LIST)) {
- return;
- }
- String[] env = jobConf.get(TASK_FILE_META_ENV_LIST).split(COMMA);
- Arrays.stream(env).forEach(data -> {
- if (data.equalsIgnoreCase(KUBERNETES)) {
- needMetadata = true;
- new KubernetesMetadataProvider(this).getData();
- } else if (data.equalsIgnoreCase(ENV_CVM)) {
- needMetadata = true;
- metadata.put(METADATA_HOST_NAME, AgentUtils.getLocalHost());
- metadata.put(METADATA_SOURCE_IP, AgentUtils.fetchLocalIp());
- metadata.put(METADATA_FILE_NAME, file.getName());
- }
- });
- }
-
private boolean isIncrement(InstanceProfile profile) {
if (profile.hasKey(TaskConstants.TASK_FILE_CONTENT_COLLECT_TYPE) &&
DataCollectType.INCREMENT
.equalsIgnoreCase(profile.get(TaskConstants.TASK_FILE_CONTENT_COLLECT_TYPE))) {
@@ -262,7 +191,7 @@ public class LogFileSource extends AbstractSource {
try {
input = new RandomAccessFile(file, "r");
while (readCount < linePosition) {
- List<String> lines = new ArrayList<>();
+ List<byte[]> lines = new ArrayList<>();
pos = readLines(input, pos, lines, Math.min((int)
(linePosition - readCount), BATCH_READ_LINE_COUNT),
BATCH_READ_LINE_TOTAL_LEN, true);
readCount += lines.size();
@@ -289,7 +218,7 @@ public class LogFileSource extends AbstractSource {
* @return The new position after the lines have been read
* @throws IOException if an I/O error occurs.
*/
- private long readLines(RandomAccessFile reader, long pos, List<String>
lines, int maxLineCount, int maxLineTotalLen,
+ private long readLines(RandomAccessFile reader, long pos, List<byte[]>
lines, int maxLineCount, int maxLineTotalLen,
boolean isCounting)
throws IOException {
if (maxLineCount == 0) {
@@ -309,11 +238,10 @@ public class LogFileSource extends AbstractSource {
switch (ch) {
case '\n':
if (isCounting) {
- lines.add(new String(""));
+ lines.add(null);
} else {
- String temp = new String(baos.toByteArray(),
StandardCharsets.UTF_8);
- lines.add(temp);
- lineTotalLen += temp.length();
+ lines.add(baos.toByteArray());
+ lineTotalLen += baos.size();
}
rePos = pos + i + 1;
if (overLen) {
@@ -350,79 +278,6 @@ public class LogFileSource extends AbstractSource {
return rePos;
}
- @Override
- public Message read() {
- SourceData sourceData = null;
- try {
- sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOGGER.warn("poll {} data get interrupted.", file.getPath(), e);
- }
- if (sourceData == null) {
- return null;
- }
- MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length());
- Message finalMsg = createMessage(sourceData);
- return finalMsg;
- }
-
- private Message createMessage(SourceData sourceData) {
- String msgWithMetaData = fillMetaData(sourceData.data);
- String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
- Map<String, String> header = new HashMap<>();
- header.put(PROXY_KEY_DATA, proxyPartitionKey);
- header.put(OFFSET, sourceData.offset.toString());
- header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
- if (extendedHandler != null) {
- extendedHandler.dealWithHeader(header,
sourceData.getData().getBytes(StandardCharsets.UTF_8));
- }
- long auditTime = 0;
- if (isRealTime) {
- auditTime = AgentUtils.getCurrentTime();
- } else {
- auditTime = profile.getSinkDataTime();
- }
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
- auditTime, 1, msgWithMetaData.length());
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME,
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
- AgentUtils.getCurrentTime(), 1, msgWithMetaData.length());
- Message finalMsg = new
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
- // if the message size is greater than max pack size,should drop it.
- if (finalMsg.getBody().length > maxPackSize) {
- LOGGER.warn("message size is {}, greater than max pack size {},
drop it!",
- finalMsg.getBody().length, maxPackSize);
- return null;
- }
- return finalMsg;
- }
-
- public String fillMetaData(String message) {
- if (!needMetadata) {
- return message;
- }
- long timestamp = System.currentTimeMillis();
- boolean isJson = FileDataUtils.isJSON(message);
- Map<String, String> mergeData = new HashMap<>(metadata);
- mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(message,
isJson));
- mergeData.put(DATA_CONTENT_TIME, RECORD_TIME_FORMAT.format(new
Date(timestamp)));
- return GSON.toJson(mergeData);
- }
-
- private boolean waitForPermit(String permitName, int permitLen) {
- boolean suc = false;
- while (!suc) {
- suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
- if (!suc) {
- MemoryManager.getInstance().printDetail(permitName, "log file
source");
- if (isInodeChanged() || !isRunnable()) {
- return false;
- }
- AgentUtils.silenceSleepInSeconds(1);
- }
- }
- return true;
- }
-
private boolean isInodeChanged() {
if (AgentUtils.getCurrentTime() - lastInodeUpdateTime >
INODE_UPDATE_INTERVAL_MS) {
try {
@@ -435,159 +290,9 @@ public class LogFileSource extends AbstractSource {
return false;
}
- private Runnable run() {
- return () -> {
- AgentThreadFactory.nameThread("log-file-source-" + taskId + "-" +
file);
- running = true;
- try {
- doRun();
- } catch (Throwable e) {
- LOGGER.error("do run error maybe file deleted: ", e);
- }
- running = false;
- };
- }
-
- private void doRun() {
- long lastPrintTime = 0;
- while (isRunnable() && fileExist) {
- if (isInodeChanged()) {
- fileExist = false;
- LOGGER.info("inode changed, instance will restart and offset
will be clean, file {}",
- fileName);
- break;
- }
- if (file.length() < bytePosition) {
- fileExist = false;
- LOGGER.info("file rotate, instance will restart and offset
will be clean, file {}",
- fileName);
- break;
- }
- boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
- if (!suc) {
- break;
- }
- List<SourceData> lines = null;
- try {
- lines = readFromPos(bytePosition);
- } catch (FileNotFoundException e) {
- fileExist = false;
- LOGGER.error("readFromPos file deleted error: ", e);
- } catch (IOException e) {
- LOGGER.error("readFromPos error: ", e);
- }
- if (lines.isEmpty()) {
- if (queue.isEmpty()) {
- emptyCount++;
- } else {
- emptyCount = 0;
- }
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
- AgentUtils.silenceSleepInSeconds(1);
- continue;
- }
- emptyCount = 0;
- for (int i = 0; i < lines.size(); i++) {
- boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length());
- if (!suc4Queue) {
- break;
- }
- putIntoQueue(lines.get(i));
- }
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
- lastPrintTime = AgentUtils.getCurrentTime();
- LOGGER.info("path is {}, linePosition {}, bytePosition is {}
file len {}, reads lines size {}",
- file.getName(), linePosition, bytePosition,
file.length(), lines.size());
- }
- }
- }
-
- private void putIntoQueue(SourceData sourceData) {
- if (sourceData == null) {
- return;
- }
- try {
- boolean offerSuc = false;
- while (isRunnable() && offerSuc != true) {
- offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
- }
- if (!offerSuc) {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length());
- }
- LOGGER.debug("Read {} from file {}", sourceData.getData(),
fileName);
- } catch (InterruptedException e) {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length());
- LOGGER.error("fetchData offer failed {}", e.getMessage());
- }
- }
-
- /**
- * Whether threads can in running state with while loop.
- *
- * @return true if threads can run
- */
- public boolean isRunnable() {
- return runnable;
- }
-
- /**
- * Stop running threads.
- */
- public void stopRunning() {
- runnable = false;
- }
-
- private List<SourceData> readFromPos(long pos) throws IOException {
- List<String> lines = new ArrayList<>();
- List<SourceData> dataList = new ArrayList<>();
- RandomAccessFile input = new RandomAccessFile(file, "r");
- bytePosition = readLines(input, pos, lines, BATCH_READ_LINE_COUNT,
BATCH_READ_LINE_TOTAL_LEN, false);
- for (int i = 0; i < lines.size(); i++) {
- linePosition++;
- dataList.add(new SourceData(lines.get(i), linePosition));
- }
- if (input != null) {
- input.close();
- }
- return dataList;
- }
-
@Override
- public void destroy() {
- LOGGER.info("destroy read source name {}", fileName);
- stopRunning();
- while (running) {
- AgentUtils.silenceSleepInMs(1);
- }
- clearQueue(queue);
- LOGGER.info("destroy read source name {} end", fileName);
- }
-
- private void clearQueue(BlockingQueue<SourceData> queue) {
- if (queue == null) {
- return;
- }
- while (queue != null && !queue.isEmpty()) {
- SourceData sourceData = null;
- try {
- sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOGGER.warn("poll {} data get interrupted.", file.getPath(),
e);
- }
- if (sourceData != null) {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length());
- }
- }
- queue.clear();
- }
-
- @Override
- public boolean sourceFinish() {
- if (isRealTime) {
- return false;
- }
- return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
+ protected boolean isRunnable() {
+ return runnable && fileExist && !isInodeChanged();
}
@Override
@@ -599,4 +304,15 @@ public class LogFileSource extends AbstractSource {
public List<Reader> split(TaskProfile jobConf) {
return null;
}
+
+ @Override
+ protected void releaseSource() {
+ if (randomAccessFile != null) {
+ try {
+ randomAccessFile.close();
+ } catch (IOException e) {
+ LOGGER.error("close randomAccessFile error", e);
+ }
+ }
+ }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
index 2f96fe66a2..555d87b412 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
@@ -44,11 +44,41 @@ public class MongoDBSource extends AbstractSource {
return readerList;
}
+ @Override
+ protected String getThreadName() {
+ return null;
+ }
+
+ @Override
+ protected void printCurrentState() {
+
+ }
+
+ @Override
+ protected boolean doPrepareToRead() {
+ return false;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ return null;
+ }
+
@Override
public Message read() {
return null;
}
+ @Override
+ protected boolean isRunnable() {
+ return runnable;
+ }
+
+ @Override
+ protected void releaseSource() {
+
+ }
+
@Override
public boolean sourceFinish() {
return false;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
index bb333c2d27..5f14cf3027 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
@@ -79,11 +79,41 @@ public class MqttSource extends AbstractSource {
return readerList;
}
+ @Override
+ protected String getThreadName() {
+ return null;
+ }
+
+ @Override
+ protected void printCurrentState() {
+
+ }
+
+ @Override
+ protected boolean doPrepareToRead() {
+ return false;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ return null;
+ }
+
@Override
public Message read() {
return null;
}
+ @Override
+ protected boolean isRunnable() {
+ return runnable;
+ }
+
+ @Override
+ protected void releaseSource() {
+
+ }
+
@Override
public boolean sourceFinish() {
return false;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
index d4c86e7634..1d3088a0aa 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
@@ -48,11 +48,41 @@ public class OracleSource extends AbstractSource {
return readerList;
}
+ @Override
+ protected String getThreadName() {
+ return null;
+ }
+
+ @Override
+ protected void printCurrentState() {
+
+ }
+
+ @Override
+ protected boolean doPrepareToRead() {
+ return false;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ return null;
+ }
+
@Override
public Message read() {
return null;
}
+ @Override
+ protected boolean isRunnable() {
+ return runnable;
+ }
+
+ @Override
+ protected void releaseSource() {
+
+ }
+
@Override
public boolean sourceFinish() {
return false;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
index 58f03cfa9b..7a65767c1b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
@@ -49,11 +49,41 @@ public class PostgreSQLSource extends AbstractSource {
return readerList;
}
+ @Override
+ protected String getThreadName() {
+ return null;
+ }
+
+ @Override
+ protected void printCurrentState() {
+
+ }
+
+ @Override
+ protected boolean doPrepareToRead() {
+ return false;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ return null;
+ }
+
@Override
public Message read() {
return null;
}
+ @Override
+ protected boolean isRunnable() {
+ return runnable;
+ }
+
+ @Override
+ protected void releaseSource() {
+
+ }
+
@Override
public boolean sourceFinish() {
return false;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index c64653e2a3..64974d7994 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -17,23 +17,12 @@
package org.apache.inlong.agent.plugin.sources;
-import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.core.task.MemoryManager;
import org.apache.inlong.agent.except.FileException;
-import org.apache.inlong.agent.message.DefaultMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.utils.AgentUtils;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -44,26 +33,11 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_RESET_TIME;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SERVICE_URL;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRIPTION;
@@ -72,27 +46,7 @@ import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRI
public class PulsarSource extends AbstractSource {
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- private class SourceData {
-
- private byte[] data;
- private Long offset;
- }
-
private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarSource.class);
- private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
- 0, Integer.MAX_VALUE,
- 1L, TimeUnit.SECONDS,
- new SynchronousQueue<>(),
- new AgentThreadFactory("pulsar-source"));
- private BlockingQueue<SourceData> queue;
- public InstanceProfile profile;
- private int maxPackSize;
- private String inlongStreamId;
- private String taskId;
- private String instanceId;
private String topic;
private String serviceUrl;
private String subscription;
@@ -100,18 +54,10 @@ public class PulsarSource extends AbstractSource {
private String subscriptionPosition;
private PulsarClient pulsarClient;
private Long timestamp;
- private volatile boolean running = false;
- private volatile boolean runnable = true;
- private volatile AtomicLong emptyCount = new AtomicLong(0);
-
- private final Integer CACHE_QUEUE_SIZE = 100000;
- private final Integer READ_WAIT_TIMEOUT_MS = 10;
- private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
- private final Integer BATCH_TOTAL_LEN = 1024 * 1024;
- private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
private final static String PULSAR_SUBSCRIPTION_PREFIX = "inlong-agent-";
- private boolean isRealTime = false;
private boolean isRestoreFromDB = false;
+ private Consumer<byte[]> consumer;
+ private long offset = 0L;
public PulsarSource() {
}
@@ -120,18 +66,7 @@ public class PulsarSource extends AbstractSource {
public void init(InstanceProfile profile) {
try {
LOGGER.info("PulsarSource init: {}", profile.toJsonStr());
- this.profile = profile;
super.init(profile);
- String cycleUnit = profile.get(TASK_CYCLE_UNIT);
- if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
- isRealTime = true;
- cycleUnit = CycleUnitType.HOUR;
- }
- queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
- maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
- inlongStreamId = profile.getInlongStreamId();
- taskId = profile.getTaskId();
- instanceId = profile.getInstanceId();
topic = profile.getInstanceId();
serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL);
subscription = profile.get(TASK_PULSAR_SUBSCRIPTION,
PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId);
@@ -141,183 +76,101 @@ public class PulsarSource extends AbstractSource {
timestamp = profile.getLong(TASK_PULSAR_RESET_TIME, 0);
pulsarClient =
PulsarClient.builder().serviceUrl(serviceUrl).build();
isRestoreFromDB = profile.getBoolean(RESTORE_FROM_DB, false);
-
- EXECUTOR_SERVICE.execute(run());
+ consumer = getConsumer();
} catch (Exception ex) {
stopRunning();
throw new FileException("error init stream for " + topic, ex);
}
}
- private Runnable run() {
- return () -> {
- AgentThreadFactory.nameThread("pulsar-source-" + taskId + "-" +
instanceId);
- running = true;
- try {
- try (Consumer<byte[]> consumer =
pulsarClient.newConsumer(Schema.BYTES)
- .topic(topic)
- .subscriptionName(subscription)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition))
-
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
- .subscribe()) {
-
- if (!isRestoreFromDB && timestamp != 0L) {
- consumer.seek(timestamp);
- LOGGER.info("Reset consume from {}", timestamp);
- } else {
- LOGGER.info("Skip to reset consume");
- }
-
- doRun(consumer);
- }
- } catch (Throwable e) {
- LOGGER.error("do run error maybe pulsar client is configured
incorrectly: ", e);
- }
- running = false;
- };
- }
-
- private void doRun(Consumer<byte[]> consumer) throws PulsarClientException
{
- long lastPrintTime = 0;
- while (isRunnable()) {
- boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
- if (!suc) {
- break;
- }
- org.apache.pulsar.client.api.Message<byte[]> message =
consumer.receive(0, TimeUnit.MILLISECONDS);
- if (ObjectUtils.isEmpty(message)) {
- if (queue.isEmpty()) {
- emptyCount.incrementAndGet();
- } else {
- emptyCount.set(0);
- }
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
- AgentUtils.silenceSleepInSeconds(1);
- continue;
- }
- emptyCount.set(0);
- long offset = 0L;
- SourceData sourceData = new SourceData(message.getValue(), 0L);
- boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, message.getValue().length);
- if (!suc4Queue) {
- break;
- }
- putIntoQueue(sourceData);
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
- consumer.acknowledge(message);
-
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
- lastPrintTime = AgentUtils.getCurrentTime();
- LOGGER.info("pulsar topic is {}, offset is {}", topic, offset);
- }
- }
+ @Override
+ protected String getThreadName() {
+ return "pulsar-source-" + taskId + "-" + instanceId;
}
- public boolean isRunnable() {
- return runnable;
+ @Override
+ protected boolean doPrepareToRead() {
+ return true;
}
- private boolean waitForPermit(String permitName, int permitLen) {
- boolean suc = false;
- while (!suc) {
- suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
- if (!suc) {
- MemoryManager.getInstance().printDetail(permitName, "log file
source");
- if (!isRunnable()) {
- return false;
- }
- AgentUtils.silenceSleepInSeconds(1);
- }
+ @Override
+ protected List<SourceData> readFromSource() {
+ List<SourceData> dataList = new ArrayList<>();
+ org.apache.pulsar.client.api.Message<byte[]> message = null;
+ try {
+ message = consumer.receive(0, TimeUnit.MILLISECONDS);
+ offset = message.getSequenceId();
+ } catch (PulsarClientException e) {
+ LOGGER.error("read from pulsar error", e);
}
- return true;
+ if (!ObjectUtils.isEmpty(message)) {
+ dataList.add(new SourceData(message.getValue(), 0L));
+ }
+ try {
+ consumer.acknowledge(message);
+ } catch (PulsarClientException e) {
+ LOGGER.error("ack pulsar error", e);
+ }
+ return dataList;
}
- private void putIntoQueue(SourceData sourceData) {
- if (sourceData == null) {
- return;
- }
+ private Consumer<byte[]> getConsumer() {
+ Consumer<byte[]> consumer = null;
try {
- boolean offerSuc = false;
- if (queue.remainingCapacity() > 0) {
- while (isRunnable() && !offerSuc) {
- offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
- }
+ consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition))
+
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
+ .subscribe();
+ if (!isRestoreFromDB && timestamp != 0L) {
+ consumer.seek(timestamp);
+ LOGGER.info("Reset consume from {}", timestamp);
+ } else {
+ LOGGER.info("Skip to reset consume");
}
-
- if (!offerSuc) {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
+ return consumer;
+ } catch (PulsarClientException | IllegalArgumentException e) {
+ if (consumer == null) {
+ try {
+ consumer.close();
+ } catch (PulsarClientException ex) {
+ LOGGER.error("close consumer error", e);
+ }
}
- LOGGER.debug("Read {} from pulsar topic {}", sourceData.getData(),
topic);
- } catch (InterruptedException e) {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
- LOGGER.error("fetchData offer failed {}", e.getMessage());
+ LOGGER.error("get consumer error", e);
}
+ return null;
}
@Override
- public List<Reader> split(TaskProfile conf) {
- return null;
+ protected void printCurrentState() {
+ LOGGER.info("pulsar topic is {}, offset is {}", topic, offset);
}
@Override
- public Message read() {
- SourceData sourceData = null;
- try {
- sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOGGER.warn("poll {} data get interrupted.", topic, e);
- }
- if (sourceData == null) {
- return null;
- }
- MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
- Message finalMsg = createMessage(sourceData);
- return finalMsg;
+ protected boolean isRunnable() {
+ return runnable;
}
- private Message createMessage(SourceData sourceData) {
- String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
- Map<String, String> header = new HashMap<>();
- header.put(PROXY_KEY_DATA, proxyPartitionKey);
- header.put(OFFSET, sourceData.offset.toString());
- header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
-
- long auditTime;
- if (isRealTime) {
- auditTime = AgentUtils.getCurrentTime();
- } else {
- auditTime = profile.getSinkDataTime();
- }
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
- auditTime, 1, sourceData.data.length);
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME,
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
- AgentUtils.getCurrentTime(), 1, sourceData.data.length);
- Message finalMsg = new DefaultMessage(sourceData.data, header);
- if (finalMsg.getBody().length > maxPackSize) {
- LOGGER.warn("message size is {}, greater than max pack size {},
drop it!",
- finalMsg.getBody().length, maxPackSize);
- return null;
+ @Override
+ protected void releaseSource() {
+ if (consumer != null) {
+ try {
+ consumer.close();
+ } catch (PulsarClientException e) {
+ LOGGER.error("close consumer error", e);
+ }
}
- return finalMsg;
}
@Override
- public boolean sourceFinish() {
- if (isRealTime) {
- return false;
- }
- return emptyCount.get() > EMPTY_CHECK_COUNT_AT_LEAST;
+ public List<Reader> split(TaskProfile conf) {
+ return null;
}
@Override
public boolean sourceExist() {
return true;
}
-
- /**
- * Stop running threads.
- */
- public void stopRunning() {
- runnable = false;
- }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
index 9f3671355e..a844ba38cb 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
@@ -49,11 +49,41 @@ public class RedisSource extends AbstractSource {
return readerList;
}
+ @Override
+ protected String getThreadName() {
+ return null;
+ }
+
+ @Override
+ protected void printCurrentState() {
+
+ }
+
+ @Override
+ protected boolean doPrepareToRead() {
+ return false;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ return null;
+ }
+
@Override
public Message read() {
return null;
}
+ @Override
+ protected boolean isRunnable() {
+ return runnable;
+ }
+
+ @Override
+ protected void releaseSource() {
+
+ }
+
@Override
public boolean sourceFinish() {
return false;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
index f199f983eb..8fceb8f635 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
@@ -48,11 +48,41 @@ public class SQLServerSource extends AbstractSource {
return readerList;
}
+ @Override
+ protected String getThreadName() {
+ return null;
+ }
+
+ @Override
+ protected void printCurrentState() {
+
+ }
+
+ @Override
+ protected boolean doPrepareToRead() {
+ return false;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ return null;
+ }
+
@Override
public Message read() {
return null;
}
+ @Override
+ protected boolean isRunnable() {
+ return runnable;
+ }
+
+ @Override
+ protected void releaseSource() {
+
+ }
+
@Override
public boolean sourceFinish() {
return false;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 5dc79377de..d9e8b9834a 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -17,22 +17,77 @@
package org.apache.inlong.agent.plugin.sources.file;
+import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Source;
+import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.metric.MetricRegister;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
+import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
public abstract class AbstractSource implements Source {
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ protected class SourceData {
+
+ private byte[] data;
+ private Long offset;
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractSource.class);
+
+ protected final Integer BATCH_READ_LINE_COUNT = 10000;
+ protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
+ protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
+ protected final Integer READ_WAIT_TIMEOUT_MS = 10;
+ private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
+ private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
+ protected BlockingQueue<SourceData> queue;
+
protected String inlongGroupId;
protected String inlongStreamId;
// metric
@@ -41,11 +96,169 @@ public abstract class AbstractSource implements Source {
protected String metricName;
protected Map<String, String> dimensions;
protected static final AtomicLong METRIX_INDEX = new AtomicLong(0);
+ protected volatile boolean runnable = true;
+ protected volatile boolean running = false;
+ protected String taskId;
+ protected String instanceId;
+ protected InstanceProfile profile;
+ private ExtendedHandler extendedHandler;
+ private boolean isRealTime = false;
+ protected volatile long emptyCount = 0;
+ protected int maxPackSize;
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("source-pool"));
+ protected OffsetProfile offsetProfile;
@Override
public void init(InstanceProfile profile) {
+ this.profile = profile;
+ taskId = profile.getTaskId();
+ instanceId = profile.getInstanceId();
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();
+ maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+ queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+ String cycleUnit = profile.get(TASK_CYCLE_UNIT);
+ if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+ isRealTime = true;
+ }
+ initOffset();
+ registerMetric();
+ initExtendHandler();
+ }
+
+ protected void initOffset() {
+ offsetProfile = OffsetManager.getInstance().getOffset(taskId,
instanceId);
+ }
+
+ @Override
+ public void start() {
+ EXECUTOR_SERVICE.execute(run());
+ }
+
+ private Runnable run() {
+ return () -> {
+ AgentThreadFactory.nameThread(getThreadName());
+ running = true;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error maybe file deleted: ", e);
+ }
+ running = false;
+ };
+ }
+
+ private void doRun() {
+ long lastPrintTime = 0;
+ while (isRunnable()) {
+ if (!prepareToRead()) {
+ break;
+ }
+ List<SourceData> lines = readFromSource();
+ if (lines != null && lines.isEmpty()) {
+ if (queue.isEmpty()) {
+ emptyCount++;
+ } else {
+ emptyCount = 0;
+ }
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
+ AgentUtils.silenceSleepInSeconds(1);
+ continue;
+ }
+ emptyCount = 0;
+ for (int i = 0; i < lines.size(); i++) {
+ boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
+ if (!suc4Queue) {
+ break;
+ }
+ putIntoQueue(lines.get(i));
+ }
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
+ lastPrintTime = AgentUtils.getCurrentTime();
+ printCurrentState();
+ }
+ }
+ }
+
+ protected abstract void printCurrentState();
+
+ /**
+ * Before reading the data source, some preparation operations need to be
done, such as memory control semaphore
+ * application and data source legitimacy verification
+ *
+ * @return true if prepared ok
+ */
+ private boolean prepareToRead() {
+ if (!doPrepareToRead()) {
+ return false;
+ }
+ return waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
+ }
+
+ /**
+ * Except for applying for memory control semaphores, all other
preparatory work is implemented by this function
+ *
+ * @return true if prepared ok
+ */
+ protected abstract boolean doPrepareToRead();
+
+ /**
+ * After preparation work, we started to truly read data from the data
source
+ *
+ * @return source data list
+ */
+ protected abstract List<SourceData> readFromSource();
+
+ private boolean waitForPermit(String permitName, int permitLen) {
+ boolean suc = false;
+ while (!suc) {
+ suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
+ if (!suc) {
+ MemoryManager.getInstance().printDetail(permitName, "source");
+ if (!isRunnable()) {
+ return false;
+ }
+ AgentUtils.silenceSleepInSeconds(1);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * After preparation work, we started to truly read data from the data
source
+ */
+ private void putIntoQueue(SourceData sourceData) {
+ if (sourceData == null) {
+ return;
+ }
+ try {
+ boolean offerSuc = false;
+ while (isRunnable() && !offerSuc) {
+ offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
+ }
+ if (!offerSuc) {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.getData().length);
+ }
+ LOGGER.debug("Read {} from source {}", sourceData.getData(),
inlongGroupId);
+ } catch (InterruptedException e) {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.getData().length);
+ LOGGER.error("fetchData offer failed", e);
+ }
+ }
+
+ /**
+ * Returns the name of the thread, used to identify different data sources
+ *
+ * @return source data list
+ */
+ protected abstract String getThreadName();
+
+ private void registerMetric() {
// register metric
this.dimensions = new HashMap<>();
dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
@@ -58,7 +271,129 @@ public abstract class AbstractSource implements Source {
sourceMetric = metricItemSet.findMetricItem(dimensions);
}
+ private void initExtendHandler() {
+ if
(DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName())
!= 0) {
+ Constructor<?> constructor =
+ null;
+ try {
+ constructor = Class.forName(
+ profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS,
DEFAULT_FILE_SOURCE_EXTEND_CLASS))
+ .getDeclaredConstructor(InstanceProfile.class);
+ } catch (NoSuchMethodException e) {
+ LOGGER.error("init {} NoSuchMethodException error",
instanceId, e);
+ } catch (ClassNotFoundException e) {
+ LOGGER.error("init {} ClassNotFoundException error",
instanceId, e);
+ }
+ constructor.setAccessible(true);
+ try {
+ extendedHandler = (ExtendedHandler)
constructor.newInstance(profile);
+ } catch (InstantiationException e) {
+ LOGGER.error("init {} InstantiationException error",
instanceId, e);
+ } catch (IllegalAccessException e) {
+ LOGGER.error("init {} IllegalAccessException error",
instanceId, e);
+ } catch (InvocationTargetException e) {
+ LOGGER.error("init {} InvocationTargetException error",
instanceId, e);
+ }
+ }
+ }
+
+ @Override
+ public Message read() {
+ SourceData sourceData = null;
+ try {
+ sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("poll {} data get interrupted.", instanceId);
+ }
+ if (sourceData == null) {
+ return null;
+ }
+ MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.getData().length);
+ return createMessage(sourceData);
+ }
+
+ private Message createMessage(SourceData sourceData) {
+ String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
+ Map<String, String> header = new HashMap<>();
+ header.put(PROXY_KEY_DATA, proxyPartitionKey);
+ header.put(OFFSET, sourceData.getOffset().toString());
+ header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
+ if (extendedHandler != null) {
+ extendedHandler.dealWithHeader(header, sourceData.getData());
+ }
+ long auditTime = 0;
+ if (isRealTime) {
+ auditTime = AgentUtils.getCurrentTime();
+ } else {
+ auditTime = profile.getSinkDataTime();
+ }
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
+ auditTime, 1, sourceData.getData().length);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME,
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
+ AgentUtils.getCurrentTime(), 1, sourceData.getData().length);
+ Message finalMsg = new DefaultMessage(sourceData.getData(), header);
+ // if the message size is greater than max pack size,should drop it.
+ if (finalMsg.getBody().length > maxPackSize) {
+ LOGGER.warn("message size is {}, greater than max pack size {},
drop it!",
+ finalMsg.getBody().length, maxPackSize);
+ return null;
+ }
+ return finalMsg;
+ }
+
+ /**
+ * Whether threads can in running state with while loop.
+ *
+ * @return true if threads can run
+ */
+ protected abstract boolean isRunnable();
+
+ /**
+ * Stop running threads.
+ */
+ public void stopRunning() {
+ runnable = false;
+ }
+
public void destroy() {
+ LOGGER.info("destroy read source name {}", instanceId);
+ stopRunning();
+ while (running) {
+ AgentUtils.silenceSleepInMs(1);
+ }
+ clearQueue(queue);
+ releaseSource();
+ LOGGER.info("destroy read source name {} end", instanceId);
+ }
+
+ /**
+ * Release the source resource if needed
+ */
+ protected abstract void releaseSource();
+ private void clearQueue(BlockingQueue<SourceData> queue) {
+ if (queue == null) {
+ return;
+ }
+ while (queue != null && !queue.isEmpty()) {
+ SourceData sourceData = null;
+ try {
+ sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("poll {} data get interrupted.", instanceId, e);
+ }
+ if (sourceData != null) {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.getData().length);
+ }
+ }
+ queue.clear();
+ }
+
+ @Override
+ public boolean sourceFinish() {
+ if (isRealTime) {
+ return false;
+ }
+ return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesMetadataProvider.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesMetadataProvider.java
deleted file mode 100644
index 63b0ae32ba..0000000000
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesMetadataProvider.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sources.reader.file;
-
-import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.sources.LogFileSource;
-import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
-import org.apache.inlong.agent.plugin.utils.PluginUtils;
-
-import com.google.gson.Gson;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodList;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.dsl.MixedOperation;
-import io.fabric8.kubernetes.client.dsl.PodResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Objects;
-
-import static
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
-import static
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
-import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
-import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
-import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_CONTAINER_ID;
-import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_CONTAINER_NAME;
-import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_NAMESPACE;
-import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_POD_LABEL;
-import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_POD_NAME;
-import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_POD_UID;
-
-/**
- * k8s file reader
- */
-public final class KubernetesMetadataProvider {
-
- private static final Logger log =
LoggerFactory.getLogger(KubernetesMetadataProvider.class);
- private static final Gson GSON = new Gson();
-
- private KubernetesClient client;
- private LogFileSource fileReaderOperator;
-
- public KubernetesMetadataProvider(LogFileSource fileReaderOperator) {
- this.fileReaderOperator = fileReaderOperator;
- }
-
- public void getData() {
- if (Objects.nonNull(client) &&
Objects.nonNull(fileReaderOperator.metadata)) {
- return;
- }
- try {
- client = PluginUtils.getKubernetesClient();
- } catch (IOException e) {
- log.error("get k8s client error: ", e);
- }
- getK8sMetadata(fileReaderOperator.profile);
- }
-
- /**
- * get PODS of kubernetes information
- */
- public PodList getPods() {
- if (Objects.isNull(client)) {
- return null;
- }
- MixedOperation<Pod, PodList, PodResource> pods = client.pods();
- return pods.list();
- }
-
- /**
- * get pod metadata by namespace and pod name
- */
- public void getK8sMetadata(InstanceProfile jobConf) {
- if (Objects.isNull(jobConf)) {
- return;
- }
- Map<String, String> k8sInfo =
MetaDataUtils.getLogInfo(fileReaderOperator.getFile().getName());
- log.info("file name is: {}, k8s information size: {}",
fileReaderOperator.getFile().getName(), k8sInfo.size());
- if (k8sInfo.isEmpty()) {
- return;
- }
-
- Map<String, String> metadata = fileReaderOperator.metadata;
- metadata.put(METADATA_NAMESPACE, k8sInfo.get(NAMESPACE));
- metadata.put(METADATA_CONTAINER_NAME, k8sInfo.get(CONTAINER_NAME));
- metadata.put(METADATA_CONTAINER_ID, k8sInfo.get(CONTAINER_ID));
- metadata.put(METADATA_POD_NAME, k8sInfo.get(POD_NAME));
-
- PodResource podResource =
client.pods().inNamespace(k8sInfo.get(NAMESPACE))
- .withName(k8sInfo.get(POD_NAME));
- if (Objects.isNull(podResource)) {
- return;
- }
- Pod pod = podResource.get();
- PodList podList =
client.pods().inNamespace(k8sInfo.get(NAMESPACE)).list();
- podList.getItems().forEach(data -> {
- if (data.equals(pod)) {
- metadata.put(METADATA_POD_UID, pod.getMetadata().getUid());
- metadata.put(METADATA_POD_LABEL,
GSON.toJson(pod.getMetadata().getLabels()));
- }
- });
- return;
- }
-}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index fb404dea0a..a9d683750c 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -94,6 +94,7 @@ public class TestLogFileSource {
OffsetManager.getInstance().setOffset(offsetProfile);
}
source.init(instanceProfile);
+ source.start();
return source;
} catch (Exception e) {
LOGGER.error("source init error {}", e);