aloyszhang commented on code in PR #10013:
URL: https://github.com/apache/inlong/pull/10013#discussion_r1570597467
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java:
##########
@@ -20,86 +20,48 @@
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
-import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.file.Task;
-import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
-import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_TOPIC;
-public class KafkaTask extends Task {
+public class KafkaTask extends AbstractTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaTask.class);
public static final String DEFAULT_KAFKA_INSTANCE =
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
- public static final int CORE_THREAD_SLEEP_TIME = 5000;
- public static final int CORE_THREAD_PRINT_TIME = 10000;
-
- private TaskProfile taskProfile;
- private Db basicDb;
- private TaskManager taskManager;
- private InstanceManager instanceManager;
- private long lastPrintTime = 0;
- private boolean initOK = false;
- private volatile boolean running = false;
private boolean isAdded = false;
- private boolean isRestoreFromDB = false;
-
private String topic;
@Override
- public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
- taskManager = (TaskManager) srcManager;
- commonInit(taskProfile, basicDb);
- initOK = true;
- }
-
- private void commonInit(TaskProfile taskProfile, Db basicDb) {
+ protected void initTask() {
LOGGER.info("kafka commonInit: {}", taskProfile.toJsonStr());
- this.taskProfile = taskProfile;
- this.basicDb = basicDb;
this.topic = taskProfile.get(TASK_KAFKA_TOPIC);
- this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
- instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
- basicDb, taskManager.getTaskDb());
- try {
- instanceManager.start();
- } catch (Exception e) {
- LOGGER.error("start instance manager error: ", e);
- }
}
@Override
- public void destroy() {
- doChangeState(State.SUCCEEDED);
- if (instanceManager != null) {
- instanceManager.stop();
- }
+ protected void releaseTask() {
}
@Override
- public TaskProfile getProfile() {
- return taskProfile;
- }
-
- @Override
- public String getTaskId() {
- if (taskProfile == null) {
- return "";
+ protected List<InstanceProfile> getNewInstanceList() {
+ List<InstanceProfile> list = new ArrayList<>();
+ if (isAdded) {
+ return list;
}
- return taskProfile.getTaskId();
+ String dataTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
Review Comment:
Not need to initiate a new DateTimeFormatter every time this method is
invoked.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]