This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fc8ef40ce0 [INLONG-9310][Agent] Add extended handler in file source
(#9311)
fc8ef40ce0 is described below
commit fc8ef40ce0f59bae6b2876e1c9eabe8d0851d648
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Nov 20 20:09:10 2023 +0800
[INLONG-9310][Agent] Add extended handler in file source (#9311)
* [INLONG-9310][Agent] Add extended handler in file source
* [INLONG-9310][Agent] Add extended handler in file source
* [INLONG-9310][Agent] Add extended handler in file source
---
.../inlong/agent/constant/TaskConstants.java | 3 ++
.../agent/message/filecollect/ProxyMessage.java | 2 +-
.../message/filecollect/ProxyMessageCache.java | 2 --
.../inlong/agent/plugin/sources/LogFileSource.java | 15 +++++++++
.../sources/file/extend/ExtendedHandler.java | 39 ++++++++++++++++++++++
5 files changed, 58 insertions(+), 3 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 872c42319f..c501ec110b 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -82,6 +82,9 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_END_TIME = "task.fileTask.endTime";
public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
public static final String PREDEFINE_FIELDS = "task.predefinedFields";
+ public static final String FILE_SOURCE_EXTEND_CLASS =
"task.fileTask.extendedClass";
+ public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS =
+
"org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler";
// Binlog job
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
index 7d9f4930ac..e8b74f40b1 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
@@ -27,7 +27,7 @@ import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_I
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
/**
- * Bus message with body, header, inlongGroupId and inlongStreamId.
+ * proxy message with body, header, inlongGroupId and inlongStreamId.
*/
public class ProxyMessage implements Message {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 9c0c84e0ef..66b3fb3b43 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -59,7 +59,6 @@ public class ProxyMessageCache {
private final int cacheTimeout;
// streamId -> list of proxyMessage
private final ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>>
messageQueueMap;
- // private final LinkedBlockingQueue<ProxyMessage> messageQueue;
private final AtomicLong cacheSize = new AtomicLong(0);
private long lastPrintTime = 0;
private long dataTime;
@@ -77,7 +76,6 @@ public class ProxyMessageCache {
DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
this.cacheTimeout =
instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS,
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
messageQueueMap = new ConcurrentHashMap<>();
- // this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
try {
dataTime =
DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(),
instanceProfile.get(TASK_CYCLE_UNIT));
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index e2b0b06517..d713fb4e8a 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -31,6 +31,7 @@ import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
+import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler;
import
org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -51,6 +52,7 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.RandomAccessFile;
+import java.lang.reflect.Constructor;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -80,6 +82,7 @@ import static
org.apache.inlong.agent.constant.MetadataConstants.ENV_CVM;
import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_FILE_NAME;
import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_HOST_NAME;
import static
org.apache.inlong.agent.constant.MetadataConstants.METADATA_SOURCE_IP;
+import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
import static
org.apache.inlong.agent.constant.TaskConstants.JOB_FILE_META_ENV_LIST;
import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
@@ -134,6 +137,7 @@ public class LogFileSource extends AbstractSource {
private volatile boolean running = false;
private long dataTime = 0;
private volatile long emptyCount = 0;
+ private ExtendedHandler extendedHandler;
public LogFileSource() {
OffsetManager.init();
@@ -159,6 +163,14 @@ public class LogFileSource extends AbstractSource {
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
dataTime =
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(),
profile.get(TASK_CYCLE_UNIT));
+ if
(DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName())
!= 0) {
+ Constructor<?> constructor =
+ Class.forName(
+
profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS,
DEFAULT_FILE_SOURCE_EXTEND_CLASS))
+ .getDeclaredConstructor(InstanceProfile.class);
+ constructor.setAccessible(true);
+ extendedHandler = (ExtendedHandler)
constructor.newInstance(profile);
+ }
try {
registerMeta(profile);
} catch (Exception ex) {
@@ -354,6 +366,9 @@ public class LogFileSource extends AbstractSource {
header.put(PROXY_KEY_DATA, proxyPartitionKey);
header.put(OFFSET, sourceData.offset.toString());
header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
+ if (extendedHandler != null) {
+ extendedHandler.dealWithHeader(header,
sourceData.getData().getBytes(StandardCharsets.UTF_8));
+ }
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
dataTime, 1, msgWithMetaData.length());
Message finalMsg = new
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
new file mode 100644
index 0000000000..8cd2e76f48
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.file.extend;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+
+import java.util.Map;
+
+// For some private, customized extension processing
+public abstract class ExtendedHandler {
+
+ public ExtendedHandler(InstanceProfile profile) {
+
+ }
+
+ // Modify the header by the body
+ public void dealWithHeader(Map<String, String> header, byte[] body) {
+
+ }
+
+ public static class Constants {
+
+ }
+}