This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new eb5f890a23 [INLONG-9149][Agent] Add sender manager for file collect
(#9150)
eb5f890a23 is described below
commit eb5f890a23e5b556d1c4e6c841b298c72388af0c
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Oct 30 14:10:28 2023 +0800
[INLONG-9149][Agent] Add sender manager for file collect (#9150)
---
.../plugin/sinks/filecollect/SenderManager.java | 444 +++++++++++++++++++++
.../agent/plugin/task/filecollect/AgentErrMsg.java | 54 +--
.../agent/plugin/task/filecollect/FileScanner.java | 6 +-
.../agent/plugin/task/filecollect/WatchEntity.java | 2 +-
.../agent/plugin/utils/file/NewDateUtils.java | 2 +-
.../inlong/agent/plugin/AgentBaseTestsHelper.java | 22 +-
.../sinks/filecollect/TestSenderManager.java | 134 +++++++
7 files changed, 619 insertions(+), 45 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
new file mode 100755
index 0000000000..a0e4d46c7f
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks.filecollect;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.file.MemoryManager;
+import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
+import org.apache.inlong.agent.message.filecollect.SenderMessage;
+import org.apache.inlong.agent.metrics.AgentMetricItem;
+import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.plugin.message.SequentialID;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.constant.ProtocolType;
+import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_PROXY_SEND;
+import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
+import static org.apache.inlong.agent.constant.TaskConstants.JOB_PROXY_SEND;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
+
+/**
+ * proxy client
+ */
+public class SenderManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SenderManager.class);
+ private static final SequentialID SEQUENTIAL_ID =
SequentialID.getInstance();
+ public final int SAVE_OFFSET_INTERVAL_MS = 1000;
+ private final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
+ // cache for group and sender list, share the map cross agent lifecycle.
+ private DefaultMessageSender sender;
+ private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("sender-manager"));
+ // sharing worker threads between sender client
+ // in case of thread abusing.
+ private ThreadFactory SHARED_FACTORY;
+ private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+ private final String managerHost;
+ private final int managerPort;
+ private final String netTag;
+ private final String localhost;
+ private final boolean isLocalVisit;
+ private final int totalAsyncBufSize;
+ private final int aliveConnectionNum;
+ private final boolean isCompress;
+ private final int msgType;
+ private final boolean isFile;
+ private final long maxSenderTimeout;
+ private final int maxSenderRetry;
+ private final long retrySleepTime;
+ private final String inlongGroupId;
+ private final int maxSenderPerGroup;
+ private final String sourcePath;
+ private final boolean proxySend;
+ private volatile boolean shutdown = false;
+ // metric
+ private AgentMetricItemSet metricItemSet;
+ private Map<String, String> dimensions;
+ private OffsetManager offsetManager;
+ private int ioThreadNum;
+ private boolean enableBusyWait;
+ private String authSecretId;
+ private String authSecretKey;
+ protected int batchFlushInterval;
+ private List<PackageAckInfo> packageAckInfoList = new ArrayList<>();
+ private final ReentrantReadWriteLock packageAckInfoLock = new
ReentrantReadWriteLock(true);
+ protected InstanceProfile profile;
+ private Random testRandom = new Random();
+ private volatile boolean offsetRunning = false;
+ private volatile boolean resendRunning = false;
+ private volatile boolean started = false;
+
+ public SenderManager(InstanceProfile profile, String inlongGroupId, String
sourcePath) {
+ AgentConfiguration conf = AgentConfiguration.getAgentConf();
+ this.profile = profile;
+ managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
+ managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
+ proxySend = profile.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND);
+ localhost = profile.get(CommonConstants.PROXY_LOCAL_HOST,
CommonConstants.DEFAULT_PROXY_LOCALHOST);
+ netTag = profile.get(CommonConstants.PROXY_NET_TAG,
CommonConstants.DEFAULT_PROXY_NET_TAG);
+ isLocalVisit = profile.getBoolean(
+ CommonConstants.PROXY_IS_LOCAL_VISIT,
CommonConstants.DEFAULT_PROXY_IS_LOCAL_VISIT);
+ totalAsyncBufSize = profile
+ .getInt(
+ CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
+ CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+ aliveConnectionNum = profile
+ .getInt(
+ CommonConstants.PROXY_ALIVE_CONNECTION_NUM,
CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
+ isCompress = profile.getBoolean(
+ CommonConstants.PROXY_IS_COMPRESS,
CommonConstants.DEFAULT_PROXY_IS_COMPRESS);
+ maxSenderPerGroup = profile.getInt(
+ CommonConstants.PROXY_MAX_SENDER_PER_GROUP,
CommonConstants.DEFAULT_PROXY_MAX_SENDER_PER_GROUP);
+ msgType = profile.getInt(CommonConstants.PROXY_MSG_TYPE,
CommonConstants.DEFAULT_PROXY_MSG_TYPE);
+ maxSenderTimeout = profile.getInt(
+ CommonConstants.PROXY_SENDER_MAX_TIMEOUT,
CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT);
+ maxSenderRetry = profile.getInt(
+ CommonConstants.PROXY_SENDER_MAX_RETRY,
CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
+ retrySleepTime = profile.getLong(
+ CommonConstants.PROXY_RETRY_SLEEP,
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
+ isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE,
CommonConstants.DEFAULT_IS_FILE);
+ offsetManager = OffsetManager.init();
+ ioThreadNum =
profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
+ CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
+ enableBusyWait =
profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
+ CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT);
+ batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL,
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
+ authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
+ authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
+
+ this.sourcePath = sourcePath;
+ this.inlongGroupId = inlongGroupId;
+
+ this.dimensions = new HashMap<>();
+ dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
+ String metricName = String.join("-", this.getClass().getSimpleName(),
+ String.valueOf(METRIC_INDEX.incrementAndGet()));
+ this.metricItemSet = new AgentMetricItemSet(metricName);
+ MetricRegister.register(metricItemSet);
+ resendQueue = new LinkedBlockingQueue<>();
+
+ }
+
+ public void Start() throws Exception {
+ createMessageSender(inlongGroupId);
+ EXECUTOR_SERVICE.execute(flushOffset());
+ EXECUTOR_SERVICE.execute(flushResendQueue());
+ started = true;
+ }
+
+ public void Stop() {
+ LOGGER.info("stop send manager");
+ shutdown = true;
+ if (!started) {
+ return;
+ }
+ while (offsetRunning || resendRunning) {
+ AgentUtils.silenceSleepInMs(1);
+ }
+ closeMessageSender();
+ clearOffset();
+ LOGGER.info("stop send manager end");
+ }
+
+ private void closeMessageSender() {
+ if (sender != null) {
+ sender.close();
+ }
+ }
+
+ private AgentMetricItem getMetricItem(Map<String, String> otherDimensions)
{
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
+ dimensions.putAll(otherDimensions);
+ return this.metricItemSet.findMetricItem(dimensions);
+ }
+
+ private AgentMetricItem getMetricItem(String groupId, String streamId) {
+ Map<String, String> dims = new HashMap<>();
+ dims.put(KEY_INLONG_GROUP_ID, groupId);
+ dims.put(KEY_INLONG_STREAM_ID, streamId);
+ return getMetricItem(dims);
+ }
+
+ /**
+ * createMessageSender
+ *
+ * @param tagName we use group id as tag name
+ */
+ private void createMessageSender(String tagName) throws Exception {
+
+ ProxyClientConfig proxyClientConfig = new ProxyClientConfig(
+ localhost, isLocalVisit, managerHost, managerPort, tagName,
netTag, authSecretId, authSecretKey);
+ proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
+ proxyClientConfig.setFile(isFile);
+ proxyClientConfig.setAliveConnections(aliveConnectionNum);
+
+ proxyClientConfig.setIoThreadNum(ioThreadNum);
+ proxyClientConfig.setEnableBusyWait(enableBusyWait);
+ proxyClientConfig.setProtocolType(ProtocolType.TCP);
+
+ SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" +
sourcePath,
+ Thread.currentThread().isDaemon());
+
+ DefaultMessageSender sender = new
DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
+ sender.setMsgtype(msgType);
+ sender.setCompress(isCompress);
+ this.sender = sender;
+ }
+
+ public void sendBatch(SenderMessage message) {
+ while (!resendQueue.isEmpty()) {
+ AgentUtils.silenceSleepInMs(retrySleepTime);
+ }
+ addAckInfo(message.getAckInfo());
+ sendBatchWithRetryCount(message, 0);
+ }
+
+ private void addAckInfo(PackageAckInfo info) {
+ packageAckInfoLock.writeLock().lock();
+ packageAckInfoList.add(info);
+ packageAckInfoLock.writeLock().unlock();
+ }
+
+ public boolean sendFinished() {
+ boolean finished = false;
+ packageAckInfoLock.writeLock().lock();
+ if (packageAckInfoList.isEmpty()) {
+ finished = true;
+ }
+ packageAckInfoLock.writeLock().unlock();
+ return finished;
+ }
+
+ /**
+ * flushOffset
+ */
+ private void doFlushOffset() {
+ packageAckInfoLock.writeLock().lock();
+ PackageAckInfo info = null;
+ for (int i = 0; i < packageAckInfoList.size();) {
+ if (packageAckInfoList.get(i).getHasAck()) {
+ info = packageAckInfoList.remove(i);
+
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen());
+ } else {
+ break;
+ }
+ }
+ if (info != null) {
+ LOGGER.info("save offset {} taskId {} instanceId {}",
info.getOffset(), profile.getTaskId(),
+ profile.getInstanceId());
+ OffsetProfile offsetProfile = new
OffsetProfile(profile.getTaskId(), profile.getInstanceId(),
+ info.getOffset(), profile.get(INODE_INFO));
+ offsetManager.setOffset(offsetProfile);
+ }
+ packageAckInfoLock.writeLock().unlock();
+ }
+
+ private void clearOffset() {
+ packageAckInfoLock.writeLock().lock();
+ for (int i = 0; i < packageAckInfoList.size();) {
+ MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT,
packageAckInfoList.remove(i).getLen());
+ }
+ packageAckInfoLock.writeLock().unlock();
+ }
+
+ /**
+ * Send message to proxy by batch, use message cache.
+ */
+ private void sendBatchWithRetryCount(SenderMessage message, int retry) {
+ boolean suc = false;
+ while (!suc) {
+ try {
+ AgentSenderCallback cb = new AgentSenderCallback(message,
retry);
+ asyncSendByMessageSender(cb, message.getDataList(),
message.getGroupId(),
+ message.getStreamId(), message.getDataTime(),
SEQUENTIAL_ID.getNextUuid(),
+ maxSenderTimeout, TimeUnit.SECONDS,
message.getExtraMap(), proxySend);
+ getMetricItem(message.getGroupId(),
message.getStreamId()).pluginSendCount.addAndGet(
+ message.getMsgCnt());
+ suc = true;
+ } catch (Exception exception) {
+ suc = false;
+ if (retry > maxSenderRetry) {
+ if (retry % 10 == 0) {
+ LOGGER.error("max retry reached, sample log Exception
caught", exception);
+ }
+ } else {
+ LOGGER.error("Exception caught", exception);
+ }
+ retry++;
+ AgentUtils.silenceSleepInMs(retrySleepTime);
+ }
+ }
+ }
+
+ private void asyncSendByMessageSender(SendMessageCallback cb,
+ List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
+ long timeout, TimeUnit timeUnit,
+ Map<String, String> extraAttrMap, boolean isProxySend) throws
ProxysdkException {
+ sender.asyncSendMessage(cb, bodyList, groupId,
+ streamId, dt, msgUUID,
+ timeout, timeUnit, extraAttrMap, isProxySend);
+ }
+
+ /**
+ * flushResendQueue
+ *
+ * @return thread runner
+ */
+ private Runnable flushResendQueue() {
+ return () -> {
+ AgentThreadFactory.nameThread(
+ "flushResendQueue-" + profile.getTaskId() + "-" +
profile.getInstanceId());
+ LOGGER.info("start flush resend queue {}:{}", inlongGroupId,
sourcePath);
+ resendRunning = true;
+ while (!shutdown) {
+ try {
+ AgentSenderCallback callback = resendQueue.poll(1,
TimeUnit.SECONDS);
+ if (callback != null) {
+ sendBatchWithRetryCount(callback.message,
callback.retry + 1);
+ }
+ } catch (Exception ex) {
+ LOGGER.error("error caught", ex);
+ } catch (Throwable t) {
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(),
t);
+ } finally {
+ AgentUtils.silenceSleepInMs(batchFlushInterval);
+ }
+ }
+ LOGGER.info("stop flush resend queue {}:{}", inlongGroupId,
sourcePath);
+ resendRunning = false;
+ };
+ }
+
+ /**
+ * flushOffset
+ *
+ * @return thread runner
+ */
+ private Runnable flushOffset() {
+ return () -> {
+ AgentThreadFactory.nameThread(
+ "flushOffset-" + profile.getTaskId() + "-" +
profile.getInstanceId());
+ LOGGER.info("start flush offset {}:{}", inlongGroupId, sourcePath);
+ offsetRunning = true;
+ while (!shutdown) {
+ doFlushOffset();
+ AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS);
+ }
+ LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourcePath);
+ offsetRunning = false;
+ };
+ }
+
+ /**
+ * put the data into resend queue and will be resent later.
+ *
+ * @param batchMessageCallBack
+ */
+ private void putInResendQueue(AgentSenderCallback batchMessageCallBack) {
+ try {
+ resendQueue.put(batchMessageCallBack);
+ } catch (Throwable throwable) {
+ LOGGER.error("putInResendQueue e = {}", throwable);
+ }
+ }
+
+ /**
+ * sender callback
+ */
+ private class AgentSenderCallback implements SendMessageCallback {
+
+ private final int retry;
+ private final SenderMessage message;
+ private final int msgCnt;
+
+ AgentSenderCallback(SenderMessage message, int retry) {
+ this.message = message;
+ this.retry = retry;
+ this.msgCnt = message.getDataList().size();
+ }
+
+ @Override
+ public void onMessageAck(SendResult result) {
+ String groupId = message.getGroupId();
+ String streamId = message.getStreamId();
+ String taskId = message.getTaskId();
+ String instanceId = message.getInstanceId();
+ long dataTime = message.getDataTime();
+ if (result != null && result.equals(SendResult.OK)) {
+ message.getAckInfo().setHasAck(true);
+ getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
+ } else {
+ LOGGER.warn("send groupId {}, streamId {}, taskId {},
instanceId {}, dataTime {} fail with times {}, "
+ + "error {}", groupId, streamId, taskId, instanceId,
dataTime, retry, result);
+ getMetricItem(groupId,
streamId).pluginSendFailCount.addAndGet(msgCnt);
+ putInResendQueue(new AgentSenderCallback(message, retry));
+ }
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ getMetricItem(message.getGroupId(),
message.getStreamId()).pluginSendFailCount.addAndGet(msgCnt);
+ LOGGER.error("exception caught", e);
+ }
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
index eff927736d..4768aa71c3 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
@@ -21,47 +21,47 @@ public class AgentErrMsg {
public static final String CONFIG_SUCCESS = "SUCCESS";
- // 数据源配置异常 */
- public static final String DATA_SOURCE_CONFIG_ERROR =
"ERROR-0-TDAgent|10001|ERROR"
+ // data source config error */
+ public static final String DATA_SOURCE_CONFIG_ERROR =
"ERROR-0-INLONG_AGENT|10001|ERROR"
+ "|ERROR_DATA_SOURCE_CONFIG|";
- // 监控文件夹不存在 */
- public static final String DIRECTORY_NOT_FOUND_ERROR =
"ERROR-0-TDAgent|11001|WARN"
+ // directory not found error */
+ public static final String DIRECTORY_NOT_FOUND_ERROR =
"ERROR-0-INLONG_AGENT|11001|WARN"
+ "|WARN_DIRECTORY_NOT_EXIST|";
- // 监控文件夹时出错 */
- public static final String WATCH_DIR_ERROR = "ERROR-0-TDAgent|11002|ERROR"
+ // watch directory error */
+ public static final String WATCH_DIR_ERROR =
"ERROR-0-INLONG_AGENT|11002|ERROR"
+ "|ERROR_WATCH_DIR_ERROR|";
- // 要读取的文件异常(不存在,rotate)
- public static final String FILE_ERROR =
"ERROR-0-TDAgent|10002|ERROR|ERROR_SOURCE_FILE|";
+ // file error(not found,rotate)
+ public static final String FILE_ERROR =
"ERROR-0-INLONG_AGENT|10002|ERROR|ERROR_SOURCE_FILE|";
- // 读取文件异常
- public static final String FILE_OP_ERROR =
"ERROR-1-TDAgent|30002|ERROR|ERROR_OPERATE_FILE|";
+ // read file error
+ public static final String FILE_OP_ERROR =
"ERROR-1-INLONG_AGENT|30002|ERROR|ERROR_OPERATE_FILE|";
- // 磁盘满
- public static final String DISK_FULL =
"ERROR-1-TDAgent|30001|FATAL|FATAL_DISK_FULL|";
+ // disk full
+ public static final String DISK_FULL =
"ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_DISK_FULL|";
- // 内存溢出
- public static final String OOM_ERROR =
"ERROR-1-TDAgent|30001|FATAL|FATAL_OOM_ERROR|";
+ // out of memory
+ public static final String OOM_ERROR =
"ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_OOM_ERROR|";
- // watcher异常
- public static final String WATCHER_INVALID =
"ERROR-1-TDAgent|40001|WARN|WARN_INVALID_WATCHER|";
+ // watcher error
+ public static final String WATCHER_INVALID =
"ERROR-1-INLONG_AGENT|40001|WARN|WARN_INVALID_WATCHER|";
- // 连不上tdmanager
- public static final String CONNECT_TDM_ERROR =
"ERROR-1-TDAgent|30002|ERROR"
- + "|ERROR_CANNOT_CONNECT_TO_TDM|";
+ // could not connect to manager
+ public static final String CONNECT_MANAGER_ERROR =
"ERROR-1-INLONG_AGENT|30002|ERROR"
+ + "|ERROR_CANNOT_CONNECT_TO_MANAGER|";
- // 发送数据到tdbus失败
- public static final String SEND_TO_BUS_ERROR =
"ERROR-1-TDAgent|30003|ERROR|ERROR_SEND_TO_BUS|";
+ // send data to dataProxy failed
+ public static final String SEND_TO_BUS_ERROR =
"ERROR-1-INLONG_AGENT|30003|ERROR|ERROR_SEND_TO_BUS|";
- // 操作bdb异常
- public static final String BDB_ERROR =
"ERROR-1-TDAgent|30003|ERROR|BDB_OPERATION_ERROR|";
+ // operate bdb error
+ public static final String BDB_ERROR =
"ERROR-1-INLONG_AGENT|30003|ERROR|BDB_OPERATION_ERROR|";
- // 内部缓存满
- public static final String MSG_BUFFER_FULL =
"ERROR-1-TDAgent|40002|WARN|WARN_MSG_BUFFER_FULL|";
+ // buffer full
+ public static final String MSG_BUFFER_FULL =
"ERROR-1-INLONG_AGENT|40002|WARN|WARN_MSG_BUFFER_FULL|";
- // 监控到的事件不合法(任务已删除)
- public static final String FOUND_EVENT_INVALID =
"ERROR-1-TDAgent|30003|ERROR"
+ // found event invalid(task has been delete)
+ public static final String FOUND_EVENT_INVALID =
"ERROR-1-INLONG_AGENT|30003|ERROR"
+ "|FOUND_EVENT_INVALID|";
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
index 46896d6cdc..eb25d60f82 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -37,7 +37,7 @@ import java.util.regex.Pattern;
/*
* This class is mainly used for scanning log file that we want to read. We
use this class at
- * tdagent recover process, the do and redo tasks and the current log file
access when we deploy a
+ * inlong_agent recover process, the do and redo tasks and the current log
file access when we deploy a
* new data source.
*/
public class FileScanner {
@@ -114,10 +114,6 @@ public class FileScanner {
private static ArrayList<String> getUpdatedOrNewFiles(String firstDir,
String secondDir,
String fileName, long depth, int maxFileNum) {
-
- // logger.info("getUpdatedOrNewFiles: firstdir: {}, seconddir: {}
filename: {}",
- // new Object[]{firstDir, secondDir, fileName});
-
ArrayList<String> ret = new ArrayList<String>();
ArrayList<File> readyFiles = new ArrayList<File>();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
index ffd74c0100..e0c6f464c6 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
@@ -198,7 +198,7 @@ public class WatchEntity {
logger.info("Register a new directory: " + dirName);
} catch (IOException e) {
/**
- * 捕获异常,不能注册的子目录就忽略。
+ * catch error,ignore the child directory that can not
register
*/
logger.error("Register directory {} error, skip it. ",
dirName, e);
continue;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index 363b029783..935a1e4314 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -52,7 +52,7 @@ public class NewDateUtils {
public static long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
public static long HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
// data source config error */
- public static final String DATA_SOURCE_CONFIG_ERROR =
"ERROR-0-TDAgent|10001|ERROR"
+ public static final String DATA_SOURCE_CONFIG_ERROR =
"ERROR-0-INLONG_AGENT|10001|ERROR"
+ "|ERROR_DATA_SOURCE_CONFIG|";
/* Return the time in milliseconds for a data time. */
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 7850678acd..eccfe50b8c 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -88,18 +88,18 @@ public class AgentBaseTestsHelper {
private DataConfig getDataConfig(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
TaskStateEnum state) {
DataConfig dataConfig = new DataConfig();
- dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId
- dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId
- dataConfig.setDataReportType(1); // 老字段 reportType
- dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集
- dataConfig.setTaskId(taskId); // 老字段 任务 id
- dataConfig.setState(state.ordinal()); // 新增! 任务状态 1 正常 2 暂停
+ dataConfig.setInlongGroupId("testGroupId");
+ dataConfig.setInlongStreamId("testStreamId");
+ dataConfig.setDataReportType(1);
+ dataConfig.setTaskType(3);
+ dataConfig.setTaskId(taskId);
+ dataConfig.setState(state.ordinal());
FileTaskConfig fileTaskConfig = new FileTaskConfig();
- fileTaskConfig.setPattern(pattern);// 正则
- fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2
小时前的
- fileTaskConfig.setMaxFileCount(100); // 最大文件数
- fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时
- fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true
+ fileTaskConfig.setPattern(pattern);
+ fileTaskConfig.setTimeOffset("0d");
+ fileTaskConfig.setMaxFileCount(100);
+ fileTaskConfig.setCycleUnit("D");
+ fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
new file mode 100644
index 0000000000..084af58851
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks.filecollect;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
+import org.apache.inlong.agent.message.filecollect.SenderMessage;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.SendResult;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(SenderManager.class)
+@PowerMockIgnore({"javax.management.*"})
+public class TestSenderManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestSenderManager.class);
+ private static final ClassLoader LOADER =
TestSenderManager.class.getClassLoader();
+ private static AgentBaseTestsHelper helper;
+ private static InstanceProfile profile;
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("TestLogfileCollectTask"));
+
+ @BeforeClass
+ public static void setup() {
+ String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
+ helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
+ String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
+ TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING);
+ profile = taskProfile.createInstanceProfile("", fileName,
+ "20230927");
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ helper.teardownAgentHome();
+ }
+
+ @Test
+ public void testNormalAck() {
+ List<SendMessageCallback> cbList = new ArrayList<>();
+ try {
+ profile.set(TaskConstants.INODE_INFO,
FileDataUtils.getInodeInfo(profile.getInstanceId()));
+ SenderManager senderManager = PowerMockito.spy(new
SenderManager(profile, "inlongGroupId", "sourceName"));
+ PowerMockito.doNothing().when(senderManager,
"createMessageSender", Mockito.anyString());
+
+ PowerMockito.doAnswer(invocation -> {
+ SendMessageCallback cb = invocation.getArgument(0);
+ cbList.add(cb);
+ return null;
+ }).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.anyLong(), Mockito.any(),
+ Mockito.anyLong(), Mockito.any(),
+ Mockito.any(), Mockito.anyBoolean());
+
+ senderManager.Start();
+ Long packageIndex = 0L;
+ Long packageOffset = 100L;
+ List<byte[]> bodyList = new ArrayList<>();
+ bodyList.add("123456789".getBytes(StandardCharsets.UTF_8));
+ Integer resultBatchSize = 0;
+ for (int i = 0; i < bodyList.size(); i++) {
+ resultBatchSize += bodyList.get(i).length;
+ }
+ for (int i = 0; i < 10; i++) {
+ PackageAckInfo ackInfo = new PackageAckInfo(packageIndex++,
packageOffset, resultBatchSize, false);
+ SenderMessage senderMessage = new SenderMessage("taskId",
"instanceId", "groupId", "streamId", bodyList,
+ AgentUtils.getCurrentTime(), null, ackInfo);
+ senderManager.sendBatch(senderMessage);
+ packageOffset += 100;
+ }
+ Assert.assertTrue(cbList.size() == 10);
+ for (int i = 0; i < 5; i++) {
+ cbList.get(4 - i).onMessageAck(SendResult.OK);
+ }
+
+ await().atMost(2, TimeUnit.SECONDS).until(() ->
!senderManager.sendFinished());
+ for (int i = 5; i < 10; i++) {
+ cbList.get(i).onMessageAck(SendResult.OK);
+ AgentUtils.silenceSleepInMs(10);
+ }
+ await().atMost(2, TimeUnit.SECONDS).until(() ->
senderManager.sendFinished());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue("testNormalAck failed", false);
+ }
+ }
+}
\ No newline at end of file