This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 58ecec5a9d [INLONG-9190][Agent] Log file source clear buffer queue
does not take effect (#9191)
58ecec5a9d is described below
commit 58ecec5a9d57e6f00759825daccd2d7199abe5c7
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Nov 1 15:21:33 2023 +0800
[INLONG-9190][Agent] Log file source clear buffer queue does not take
effect (#9191)
---
.../inlong/agent/plugin/sources/LogFileSource.java | 2 +-
.../agent/plugin/sources/TestLogFileSource.java | 36 ++++++++++++++++++----
2 files changed, 31 insertions(+), 7 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index cc9f3724e8..ea0e63c95f 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -511,7 +511,7 @@ public class LogFileSource extends AbstractSource {
}
private void clearQueue(BlockingQueue<SourceData> queue) {
- if (queue != null) {
+ if (queue == null) {
return;
}
while (queue != null && !queue.isEmpty()) {
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 6c3257adc9..23b5027812 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -46,9 +46,11 @@ public class TestLogFileSource {
private static final Logger LOGGER =
LoggerFactory.getLogger(TestLogFileSource.class);
private static final ClassLoader LOADER =
TestLogFileSource.class.getClassLoader();
- private static LogFileSource source;
private static AgentBaseTestsHelper helper;
private static final Gson GSON = new Gson();
+ private static final String[] check = {"hello line-end-symbol aa", "world
line-end-symbol",
+ "agent line-end-symbol"};
+ private static InstanceProfile instanceProfile;
@BeforeClass
public static void setup() {
@@ -57,11 +59,15 @@ public class TestLogFileSource {
helper = new
AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING);
- InstanceProfile instanceProfile = taskProfile.createInstanceProfile("",
+ instanceProfile = taskProfile.createInstanceProfile("",
fileName, "20230928");
+
+ }
+
+ private LogFileSource getSource() {
try {
instanceProfile.set(TaskConstants.INODE_INFO,
FileDataUtils.getInodeInfo(instanceProfile.getInstanceId()));
- source = new LogFileSource();
+ LogFileSource source = new LogFileSource();
Whitebox.setInternalState(source, "BATCH_READ_LINE_COUNT", 1);
Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10);
Whitebox.setInternalState(source, "PRINT_INTERVAL_MS", 0);
@@ -69,25 +75,31 @@ public class TestLogFileSource {
Whitebox.setInternalState(source, "FINISH_READ_MAX_COUNT", 1);
Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
source.init(instanceProfile);
+ return source;
} catch (Exception e) {
LOGGER.error("source init error {}", e);
Assert.assertTrue("source init error", false);
}
+ return null;
}
@AfterClass
public static void teardown() throws Exception {
- source.destroy();
helper.teardownAgentHome();
}
@Test
- public void testTaskManager() {
- String[] check = {"hello line-end-symbol aa", "world line-end-symbol",
"agent line-end-symbol"};
+ public void testLogFileSource() {
+ testFullRead();
+ testCleanQueue();
+ }
+
+ private void testFullRead() {
int srcLen = 0;
for (int i = 0; i < check.length; i++) {
srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
}
+ LogFileSource source = getSource();
await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
int cnt = 0;
int leftBeforeRead =
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
@@ -101,9 +113,21 @@ public class TestLogFileSource {
msg = source.read();
cnt++;
}
+ source.destroy();
Assert.assertTrue(cnt == 3);
Assert.assertTrue(srcLen == readLen);
int leftAfterRead =
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
Assert.assertTrue(leftAfterRead ==
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
}
+
+ private void testCleanQueue() {
+ LogFileSource source = getSource();
+ await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
+ for (int i = 0; i < 2; i++) {
+ source.read();
+ }
+ source.destroy();
+ int leftAfterRead =
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
+ Assert.assertTrue(leftAfterRead ==
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
+ }
}
\ No newline at end of file