This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9abfce4a92 [INLONG-11687][Agent] Optimize task main thread exception 
handling to prevent exception exits (#11688)
9abfce4a92 is described below

commit 9abfce4a92cc5c022b05c79cbad08bd12b91df44
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Jan 20 11:30:08 2025 +0800

    [INLONG-11687][Agent] Optimize task main thread exception handling to 
prevent exception exits (#11688)
---
 .../agent/core/instance/InstanceManager.java       |   5 +
 .../inlong/agent/plugin/sinks/ProxySink.java       |   6 +-
 .../inlong/agent/plugin/task/AbstractTask.java     |  38 +++---
 .../inlong/agent/plugin/task/TestSQLTask.java      | 147 +++++++++++++++++++++
 4 files changed, 176 insertions(+), 20 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 3396c3a591..3f0a914e90 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -366,6 +366,11 @@ public class InstanceManager extends AbstractDaemon {
 
     private void deleteFromStore(String instanceId) {
         InstanceProfile profile = instanceStore.getInstance(taskId, 
instanceId);
+        if (profile == null) {
+            LOGGER.error("try to delete instance from store but not found: 
taskId {} instanceId {}", taskId,
+                    instanceId);
+            return;
+        }
         String inlongGroupId = profile.getInlongGroupId();
         String inlongStreamId = profile.getInlongStreamId();
         instanceStore.deleteInstance(taskId, instanceId);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index e00ad65cba..069932711c 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -140,7 +140,11 @@ public class ProxySink extends AbstractSink {
             LOGGER.info("start flush cache {}:{} flush interval {}", 
inlongGroupId, sourceName, batchFlushInterval);
             running = true;
             while (!shutdown) {
-                sendMessageFromCache();
+                try {
+                    sendMessageFromCache();
+                } catch (Throwable e) {
+                    LOGGER.error("send message from cache error: ", e);
+                }
                 AgentUtils.silenceSleepInMs(batchFlushInterval);
             }
             LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index d9ec53ab0b..ef8107c68e 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -109,32 +109,32 @@ public abstract class AbstractTask extends Task {
     public void run() {
         Thread.currentThread().setName("task-core-" + getTaskId());
         running = true;
-        try {
-            doRun();
-        } catch (Throwable e) {
-            LOGGER.error("do run error: ", e);
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+        while (!isFinished()) {
+            try {
+                doRun();
+            } catch (Throwable e) {
+                LOGGER.error("do run error: ", e);
+                ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+            }
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
         }
         running = false;
     }
 
     protected void doRun() {
-        while (!isFinished()) {
-            taskPrint();
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-            if (!initOK) {
-                continue;
-            }
-            List<InstanceProfile> profileList = getNewInstanceList();
-            for (InstanceProfile profile : profileList) {
-                InstanceAction action = new InstanceAction(ActionType.ADD, 
profile);
-                while (!isFinished() && !instanceManager.submitAction(action)) 
{
-                    LOGGER.error("instance manager action queue is full: 
taskId {}", getTaskId());
-                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-                }
+        taskPrint();
+        if (!initOK) {
+            return;
+        }
+        List<InstanceProfile> profileList = getNewInstanceList();
+        for (InstanceProfile profile : profileList) {
+            InstanceAction action = new InstanceAction(ActionType.ADD, 
profile);
+            while (!isFinished() && !instanceManager.submitAction(action)) {
+                LOGGER.error("instance manager action queue is full: taskId 
{}", getTaskId());
+                AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
             }
-            taskHeartbeat();
         }
+        taskHeartbeat();
     }
 
     protected abstract List<InstanceProfile> getNewInstanceList();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestSQLTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestSQLTask.java
new file mode 100644
index 0000000000..cc0eea8f5d
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestSQLTask.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.task.logcollection.SQLTask;
+import org.apache.inlong.common.enums.TaskStateEnum;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore({"javax.management.*"})
+public class TestSQLTask {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestSQLTask.class);
+    private static final ClassLoader LOADER = 
TestSQLTask.class.getClassLoader();
+    private static AgentBaseTestsHelper helper;
+    private static TaskManager manager;
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("TestSQLTask"));
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        helper = new 
AgentBaseTestsHelper(TestSQLTask.class.getName()).setupAgentHome();
+        manager = new TaskManager();
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+        helper.teardownAgentHome();
+    }
+
+    @Test
+    public void testScan() {
+        doTest(1, "select * from table where field = YYYYMMDD_[0-9]+;", 
CycleUnitType.DAY,
+                Arrays.asList("select * from table where field = 
20230928_[0-9]+;",
+                        "select * from table where field = 20230929_[0-9]+;",
+                        "select * from table where field = 20230930_[0-9]+;"),
+                Arrays.asList("20230928", "20230929", "20230930"),
+                "20230928",
+                "20230930");
+        doTest(2, "select * from table where field = YYYYMMDDHH_[0-9]+;", 
CycleUnitType.HOUR,
+                Arrays.asList("select * from table where field = 
2023092823_[0-9]+;",
+                        "select * from table where field = 2023092900_[0-9]+;",
+                        "select * from table where field = 
2023092901_[0-9]+;"),
+                Arrays.asList("2023092823", "2023092900", "2023092901"), 
"2023092823", "2023092901");
+        doTest(3, "select * from table where field = YYYYMMDDHHmm_[0-9]+;", 
CycleUnitType.MINUTE,
+                Arrays.asList("select * from table where field = 
202309282359_[0-9]+;",
+                        "select * from table where field = 
202309290000_[0-9]+;",
+                        "select * from table where field = 
202309290001_[0-9]+;"),
+                Arrays.asList("202309282359", "202309290000", "202309290001"), 
"202309282359", "202309290001");
+    }
+
+    @Test
+    public void testScanLowercase() {
+        doTest(1, "select * from table where field = yyyyMMdd_[0-9]+;", 
CycleUnitType.DAY,
+                Arrays.asList("select * from table where field = 
20230928_[0-9]+;",
+                        "select * from table where field = 20230929_[0-9]+;",
+                        "select * from table where field = 20230930_[0-9]+;"),
+                Arrays.asList("20230928", "20230929", "20230930"),
+                "20230928",
+                "20230930");
+        doTest(2, "select * from table where field = yyyyMMddhh_[0-9]+;", 
CycleUnitType.HOUR,
+                Arrays.asList("select * from table where field = 
2023092823_[0-9]+;",
+                        "select * from table where field = 2023092900_[0-9]+;",
+                        "select * from table where field = 
2023092901_[0-9]+;"),
+                Arrays.asList("2023092823", "2023092900", "2023092901"), 
"2023092823", "2023092901");
+        doTest(3, "select * from table where field = yyyyMMddhhmm_[0-9]+;", 
CycleUnitType.MINUTE,
+                Arrays.asList("select * from table where field = 
202309282359_[0-9]+;",
+                        "select * from table where field = 
202309290000_[0-9]+;",
+                        "select * from table where field = 
202309290001_[0-9]+;"),
+                Arrays.asList("202309282359", "202309290000", "202309290001"), 
"202309282359", "202309290001");
+    }
+
+    private void doTest(int taskId, String sql, String cycle, List<String> 
srcSQLs, List<String> srcDataTimes,
+            String startTime, String endTime) {
+        TaskProfile taskProfile = helper.getSQLTaskProfile(taskId, sql, "csv", 
true, startTime, endTime,
+                TaskStateEnum.RUNNING, cycle, "GMT+8:00");
+        SQLTask sqlTask = null;
+        final List<String> fileName = new ArrayList();
+        final List<String> dataTime = new ArrayList();
+        try {
+            sqlTask = PowerMockito.spy(new SQLTask());
+            PowerMockito.doAnswer(invocation -> {
+                fileName.add(invocation.getArgument(0));
+                dataTime.add(invocation.getArgument(1));
+                return null;
+            }).when(sqlTask, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(),
+                    Mockito.anyString());
+            Assert.assertTrue(sqlTask.isProfileValid(taskProfile));
+            manager.getTaskStore().storeTask(taskProfile);
+            sqlTask.init(manager, taskProfile, 
manager.getInstanceBasicStore());
+            EXECUTOR_SERVICE.submit(sqlTask);
+        } catch (Exception e) {
+            LOGGER.error("source init error", e);
+            Assert.assertTrue("source init error", false);
+        }
+        await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> fileName.size() == srcDataTimes.size() && 
dataTime.size() == srcDataTimes.size());
+        for (int i = 0; i < fileName.size(); i++) {
+            Assert.assertEquals(0, fileName.get(i).compareTo(srcSQLs.get(i)));
+            Assert.assertEquals(0, 
dataTime.get(i).compareTo(srcDataTimes.get(i)));
+        }
+        sqlTask.destroy();
+    }
+}
\ No newline at end of file

Reply via email to