This is an automated email from the ASF dual-hosted git repository.
yasith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new 1d482fe0f2 refine monitor codes
1d482fe0f2 is described below
commit 1d482fe0f2081de45cdf3d9fb6c633002a40b7fd
Author: yasithdev <[email protected]>
AuthorDate: Tue Jul 15 01:37:22 2025 -0500
refine monitor codes
---
.../helix/impl/workflow/PostWorkflowManager.java | 193 ++++++++-------------
.../apache/airavata/monitor/AbstractMonitor.java | 35 ++--
.../airavata/monitor/email/EmailBasedMonitor.java | 46 +++--
3 files changed, 110 insertions(+), 164 deletions(-)
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 8fe307cbae..a0dc3be935 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -19,19 +19,8 @@
*/
package org.apache.airavata.helix.impl.workflow;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.*;
+import java.util.concurrent.*;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
@@ -61,12 +50,7 @@ import
org.apache.airavata.monitor.kafka.JobStatusResultDeserializer;
import org.apache.airavata.patform.monitoring.CountMonitor;
import org.apache.airavata.patform.monitoring.MonitoringServer;
import org.apache.airavata.registry.api.RegistryService;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
@@ -77,7 +61,7 @@ public class PostWorkflowManager extends WorkflowManager {
private static final Logger logger =
LoggerFactory.getLogger(PostWorkflowManager.class);
private static final CountMonitor postwfCounter = new
CountMonitor("post_wf_counter");
- private ExecutorService processingPool = Executors.newFixedThreadPool(10);
+ private final ExecutorService processingPool =
Executors.newFixedThreadPool(10);
public PostWorkflowManager() throws ApplicationSettingsException {
super(
@@ -85,6 +69,21 @@ public class PostWorkflowManager extends WorkflowManager {
Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters")));
}
+ public static void main(String[] args) throws Exception {
+
+ if
(ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) {
+ MonitoringServer monitoringServer = new MonitoringServer(
+
ServerSettings.getSetting("post.workflow.manager.monitoring.host"),
+
ServerSettings.getIntSetting("post.workflow.manager.monitoring.port"));
+ monitoringServer.start();
+
+ Runtime.getRuntime().addShutdownHook(new
Thread(monitoringServer::stop));
+ }
+
+ PostWorkflowManager postManager = new PostWorkflowManager();
+ postManager.startServer();
+ }
+
private void init() throws Exception {
super.initComponents();
}
@@ -113,115 +112,89 @@ public class PostWorkflowManager extends WorkflowManager
{
RegistryService.Client registryClient =
getRegistryClientPool().getResource();
- try {
- logger.info("Processing job result of job id " +
jobStatusResult.getJobId() + " sent by "
- + jobStatusResult.getPublisherName());
-
- List<JobModel> jobs = registryClient.getJobs("jobId",
jobStatusResult.getJobId());
+ var jobId = jobStatusResult.getJobId();
+ var jobName = jobStatusResult.getJobName();
+ var jobState = jobStatusResult.getState();
+ var publisherId = jobStatusResult.getPublisherName();
+ logger.info("processing JobStatusUpdate<{}> from {}: {}", jobId,
publisherId, jobStatusResult);
- if (jobs.size() > 0) {
- logger.info("Filtering total " + jobs.size() + " with target
job name " + jobStatusResult.getJobName());
+ try {
+ List<JobModel> jobs = registryClient.getJobs("jobId", jobId);
+ logger.info("Found {} jobs in registry with id={}", jobs.size(),
jobId);
+ if (!jobs.isEmpty()) {
jobs = jobs.stream()
- .filter(jm ->
jm.getJobName().equals(jobStatusResult.getJobName()))
- .collect(Collectors.toList());
+ .filter(jm -> jm.getJobName().equals(jobName))
+ .toList();
+ logger.info("Found {} jobs in registry with id={} and
name={}", jobs.size(), jobId, jobName);
}
-
if (jobs.size() != 1) {
- logger.error("Couldn't find exactly one job with id " +
jobStatusResult.getJobId() + " and name "
- + jobStatusResult.getJobName() + " in the registry.
Count " + jobs.size());
+ logger.error("Found {} job(s) in registry with id={} and
name={}", jobs.size(), jobId, jobName);
getRegistryClientPool().returnResource(registryClient);
return false;
}
-
JobModel jobModel = jobs.get(0);
ProcessModel processModel =
registryClient.getProcess(jobModel.getProcessId());
ExperimentModel experimentModel =
registryClient.getExperiment(processModel.getExperimentId());
ProcessStatus processStatus =
registryClient.getProcessStatus(processModel.getProcessId());
+ var processState = processStatus.getState();
getRegistryClientPool().returnResource(registryClient);
- if (processModel != null && experimentModel != null) {
-
- jobModel.getJobStatuses()
-
.sort(Comparator.comparingLong(JobStatus::getTimeOfStateChange)
- .reversed());
+ if (experimentModel != null) {
+
jobModel.getJobStatuses().sort(Comparator.comparingLong(JobStatus::getTimeOfStateChange).reversed());
JobState currentJobStatus =
jobModel.getJobStatuses().get(0).getJobState();
+ logger.info("Last known state of job {} is {}", jobId,
jobName);
- logger.info("Last known state of job " + jobModel.getJobId() +
" is " + currentJobStatus.name());
-
- if (!JobStateValidator.isValid(currentJobStatus,
jobStatusResult.getState())) {
- logger.warn("Job state of " + jobStatusResult.getJobId() +
" is not valid. Previous state "
- + currentJobStatus + ", new state " +
jobStatusResult.getState());
+ if (!JobStateValidator.isValid(currentJobStatus, jobState)) {
+ logger.warn("JobStatusUpdate<{}> invalid. prev={} ->
new={}", jobId, currentJobStatus, jobState);
return true;
}
- String gateway = experimentModel.getGatewayId();
+ String task = jobModel.getTaskId();
String processId = processModel.getProcessId();
+ String gateway = experimentModel.getGatewayId();
String experimentId = experimentModel.getExperimentId();
- String task = jobModel.getTaskId();
-
- logger.info("Updating the job status for job id : " +
jobStatusResult.getJobId() + " with process id "
- + processId + ", exp id " + experimentId + ", gateway
" + gateway + " and status "
- + jobStatusResult.getState().name());
- saveAndPublishJobStatus(
- jobStatusResult.getJobId(), task, processId,
experimentId, gateway, jobStatusResult.getState());
+ logger.info("saving JobStatusUpdate<{}>: pid={}, eid={},
gw={}, state={}",
+ jobId, processId, experimentId, gateway, jobState);
+ saveAndPublishJobStatus(jobId, task, processId, experimentId,
gateway, jobState);
// TODO get cluster lock before that
- if (ProcessState.CANCELLING.equals(processStatus.getState())
- ||
ProcessState.CANCELED.equals(processStatus.getState())) {
- logger.info("Cancelled post workflow for process " +
processId + " in experiment " + experimentId);
- // This will mark an cancelling Experiment into a
cancelled status for a set of valid job statuses
+ if (ProcessState.CANCELLING.equals(processState) ||
ProcessState.CANCELED.equals(processState)) {
+ logger.info("Cancelled post workflow for process {} in
experiment {}", processId, experimentId);
+ // This will mark a canceling Experiment with CANCELED
status for a set of valid job statuses
// This is a safety check. Cancellation is originally
handled in Job Cancellation Workflow
- switch (jobStatusResult.getState()) {
+ switch (jobState) {
case FAILED:
case SUSPENDED:
case CANCELED:
case COMPLETE:
- logger.info("Job " + jobStatusResult.getJobId() +
" status is " + jobStatusResult.getState()
- + " so marking experiment " + experimentId
+ " as cancelled");
+ logger.info("canceled job={}: eid={}, state={}",
jobId, experimentId, jobState);
publishProcessStatus(processId, experimentId,
gateway, ProcessState.CANCELED);
break;
default:
- logger.warn("Job " + jobStatusResult.getJobId() +
" status " + jobStatusResult.getState()
- + " is invalid to mark experiment " +
experimentId + " as cancelled");
+ logger.warn("skipping job={}: eid={}, state={}",
jobId, experimentId, jobState);
}
} else {
-
- if (jobStatusResult.getState() == JobState.COMPLETE
- || jobStatusResult.getState() == JobState.FAILED) {
- // if the job is FAILED, still run output staging
tasks to debug the reason for failure. And
- // update
- // the experiment status as COMPLETED as this job
failure is not related to Airavata scope.
- logger.info("Starting the post workflow for job id : "
+ jobStatusResult.getJobId()
- + " with process id " + processId + ", gateway
" + gateway + " and status "
- + jobStatusResult.getState().name());
-
- logger.info("Job " + jobStatusResult.getJobId() + "
was completed");
-
+ logger.info("Job {} is in state={}", jobId, jobState);
+ if (jobState == JobState.COMPLETE || jobState ==
JobState.FAILED) {
+ // If Job has FAILED, still run output staging tasks
to debug the reason for failure. And
+ // update the experiment status as COMPLETED as job
failures are unrelated to Airavata scope.
+ logger.info("running PostWorkflow for process {} of
experiment {}", processId, experimentId);
executePostWorkflow(processId, gateway, false);
} else if (jobStatusResult.getState() ==
JobState.CANCELED) {
- logger.info("Job " + jobStatusResult.getJobId()
- + " was externally cancelled but process is
not marked as cancelled yet");
+ logger.info("Setting process {} of experiment {} to
state=CANCELED", processId, experimentId);
publishProcessStatus(processId, experimentId, gateway,
ProcessState.CANCELED);
- logger.info("Marked process " + processId + " of
experiment " + experimentId
- + " as cancelled as job is already being
cancelled");
-
- } else if (jobStatusResult.getState() ==
JobState.SUBMITTED) {
- logger.info("Job " + jobStatusResult.getJobId() + "
was submitted");
}
}
return true;
} else {
- logger.warn("Could not find a monitoring register for job id "
+ jobStatusResult.getJobId());
+ logger.warn("Could not find a monitoring register for job id
{}", jobId);
return false;
}
} catch (Exception e) {
- logger.error(
- "Failed to process job : " + jobStatusResult.getJobId() +
", with status : "
- + jobStatusResult.getState().name(),
- e);
+ logger.error("Failed to process job: {}, with status : {}",
jobStatusResult.getJobId(), jobStatusResult.getState().name(), e);
getRegistryClientPool().returnBrokenResource(registryClient);
return false;
}
@@ -237,21 +210,21 @@ public class PostWorkflowManager extends WorkflowManager {
HelixTaskFactory taskFactory;
try {
processModel = registryClient.getProcess(processId);
- experimentModel =
registryClient.getExperiment(processModel.getExperimentId());
- getRegistryClientPool().returnResource(registryClient);
- ResourceType resourceType = registryClient
- .getGroupComputeResourcePreference(
- processModel.getComputeResourceId(),
processModel.getGroupResourceProfileId())
- .getResourceType();
+ var experimentId = processModel.getExperimentId();
+ var crId = processModel.getComputeResourceId();
+ var grpId = processModel.getGroupResourceProfileId();
+
+ experimentModel = registryClient.getExperiment(experimentId);
+ ResourceType resourceType =
registryClient.getGroupComputeResourcePreference(crId, grpId).getResourceType();
+
taskFactory = TaskFactory.getFactory(resourceType);
logger.info("Initialized task factory for resource type {} for
process {}", resourceType, processId);
} catch (Exception e) {
- logger.error(
- "Failed to fetch experiment or process from registry
associated with process id " + processId, e);
+ logger.error("Failed to fetch experiment/process from registry for
pid={}", processId, e);
+ throw new Exception("Failed to fetch experiment/process from
registry for pid=" + processId, e);
+ } finally {
getRegistryClientPool().returnResource(registryClient);
- throw new Exception(
- "Failed to fetch experiment or process from registry
associated with process id " + processId, e);
}
String taskDag = processModel.getTaskDag();
@@ -264,8 +237,7 @@ public class PostWorkflowManager extends WorkflowManager {
jobVerificationTask.setGatewayId(experimentModel.getGatewayId());
jobVerificationTask.setExperimentId(experimentModel.getExperimentId());
jobVerificationTask.setProcessId(processModel.getProcessId());
- jobVerificationTask.setTaskId(
- "Job-Verification-Task-" + UUID.randomUUID().toString() + "-");
+ jobVerificationTask.setTaskId("Job-Verification-Task-" +
UUID.randomUUID() + "-");
jobVerificationTask.setForceRunTask(forceRun);
jobVerificationTask.setSkipAllStatusPublish(true);
@@ -320,7 +292,7 @@ public class PostWorkflowManager extends WorkflowManager {
completingTask.setGatewayId(experimentModel.getGatewayId());
completingTask.setExperimentId(experimentModel.getExperimentId());
completingTask.setProcessId(processModel.getProcessId());
- completingTask.setTaskId("Completing-Task-" +
UUID.randomUUID().toString() + "-");
+ completingTask.setTaskId("Completing-Task-" + UUID.randomUUID() + "-");
completingTask.setForceRunTask(forceRun);
completingTask.setSkipAllStatusPublish(true);
if (allTasks.size() > 0) {
@@ -341,8 +313,7 @@ public class PostWorkflowManager extends WorkflowManager {
allTasks.add(parsingTriggeringTask);
String workflowName = getWorkflowOperator()
- .launchWorkflow(
- processId + "-POST-" + UUID.randomUUID().toString(),
new ArrayList<>(allTasks), true, false);
+ .launchWorkflow(processId + "-POST-" + UUID.randomUUID(), new
ArrayList<>(allTasks), true, false);
registerWorkflowForProcess(processId, workflowName, "POST");
}
@@ -353,7 +324,6 @@ public class PostWorkflowManager extends WorkflowManager {
final Consumer<String, JobStatusResult> consumer = createConsumer();
new Thread(() -> {
while (true) {
-
final ConsumerRecords<String, JobStatusResult>
consumerRecords = consumer.poll(Long.MAX_VALUE);
CompletionService<Boolean> executorCompletionService =
new
ExecutorCompletionService<>(processingPool);
@@ -370,13 +340,11 @@ public class PostWorkflowManager extends WorkflowManager {
record.value().getJobId());
// This avoids kafka read thread to wait until
processing is completed before committing
- // There is a risk of missing 20 messages in
case of a restart but this improves the
- // robustness
- // of the kafka read thread by avoiding wait
timeouts
+ // There is a risk of missing 20 messages in
case of a restart, but this improves the
+ // robustness of the kafka read thread by
avoiding wait timeouts
processingFutures.add(executorCompletionService.submit(() -> {
boolean success = process(record.value());
- logger.info("Status of processing "
- + record.value().getJobId() + " :
" + success);
+ logger.info("Status of processing {} :
{}", record.value().getJobId(), success);
return success;
}));
@@ -440,19 +408,4 @@ public class PostWorkflowManager extends WorkflowManager {
}
public void stopServer() {}
-
- public static void main(String[] args) throws Exception {
-
- if
(ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) {
- MonitoringServer monitoringServer = new MonitoringServer(
-
ServerSettings.getSetting("post.workflow.manager.monitoring.host"),
-
ServerSettings.getIntSetting("post.workflow.manager.monitoring.port"));
- monitoringServer.start();
-
- Runtime.getRuntime().addShutdownHook(new
Thread(monitoringServer::stop));
- }
-
- PostWorkflowManager postManager = new PostWorkflowManager();
- postManager.startServer();
- }
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
b/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
index e569a29be3..ce29676cc8 100644
---
a/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
+++
b/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
@@ -21,7 +21,6 @@ package org.apache.airavata.monitor;
import java.time.Duration;
import java.util.List;
-import java.util.stream.Collectors;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftClientPool;
@@ -29,10 +28,6 @@ import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.monitor.kafka.MessageProducer;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,14 +35,10 @@ public class AbstractMonitor {
private static final Logger log =
LoggerFactory.getLogger(AbstractMonitor.class);
- private MessageProducer messageProducer;
- private CuratorFramework curatorClient;
+ private final MessageProducer messageProducer;
private ThriftClientPool<RegistryService.Client> registryClientPool;
public AbstractMonitor() throws ApplicationSettingsException {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- this.curatorClient =
CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(),
retryPolicy);
- this.curatorClient.start();
this.initRegistryClientPool();
messageProducer = new MessageProducer();
}
@@ -79,16 +70,19 @@ public class AbstractMonitor {
log.info("Fetching matching jobs for job id {} from registry",
jobStatusResult.getJobId());
List<JobModel> jobs = registryClient.getJobs("jobId",
jobStatusResult.getJobId());
- if (jobs.size() > 0) {
- log.info("Filtering total " + jobs.size() + " with target job
name " + jobStatusResult.getJobName());
+ if (!jobs.isEmpty()) {
+ log.info("Filtering total {} with target job name {}",
jobs.size(), jobStatusResult.getJobName());
jobs = jobs.stream()
.filter(jm ->
jm.getJobName().equals(jobStatusResult.getJobName()))
- .collect(Collectors.toList());
+ .toList();
}
if (jobs.size() != 1) {
- log.error("Couldn't find exactly one job with id " +
jobStatusResult.getJobId() + " and name "
- + jobStatusResult.getJobName() + " in the registry.
Count " + jobs.size());
+ log.error(
+ "Couldn't find exactly one job with id {} and name {}
in the registry. Count {}",
+ jobStatusResult.getJobId(),
+ jobStatusResult.getJobName(),
+ jobs.size());
validated = false;
} else {
@@ -98,11 +92,14 @@ public class AbstractMonitor {
String experimentId =
registryClient.getProcess(processId).getExperimentId();
if (experimentId != null && processId != null) {
- log.info("Job id " + jobStatusResult.getJobId() + " is
owned by process " + processId
- + " of experiment " + experimentId);
+ log.info(
+ "Job id {} is owned by process {} of experiment
{}",
+ jobStatusResult.getJobId(),
+ processId,
+ experimentId);
validated = true;
} else {
- log.error("Experiment or process is null for job " +
jobStatusResult.getJobId());
+ log.error("Experiment or process is null for job {}",
jobStatusResult.getJobId());
validated = false;
}
}
@@ -110,7 +107,7 @@ public class AbstractMonitor {
return validated;
} catch (Exception e) {
- log.error("Error at validating job status " +
jobStatusResult.getJobId(), e);
+ log.error("Error at validating job status {}",
jobStatusResult.getJobId(), e);
getRegistryClientPool().returnBrokenResource(registryClient);
return false;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
index 7e36df285d..1bfb591e56 100644
---
a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
+++
b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
@@ -29,6 +29,7 @@ import jakarta.mail.Store;
import jakarta.mail.search.FlagTerm;
import jakarta.mail.search.SearchTerm;
import java.io.InputStream;
+import java.time.Duration;
import java.util.*;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.ApplicationSettings;
@@ -50,17 +51,14 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
private static final String IMAPS = "imaps";
private static final String POP3 = "pop3";
- private boolean stopMonitoring = false;
- private Session session;
private Store store;
private Folder emailFolder;
private Properties properties;
private String host, emailAddress, password, storeProtocol, folderName;
- private Map<ResourceJobManagerType, EmailParser> emailParserMap =
- new HashMap<ResourceJobManagerType, EmailParser>();
- private Map<String, ResourceJobManagerType> addressMap = new HashMap<>();
+ private final Map<ResourceJobManagerType, EmailParser> emailParserMap =
new HashMap<>();
+ private final Map<String, ResourceJobManagerType> addressMap = new
HashMap<>();
private Message[] flushUnseenMessages;
- private Map<ResourceJobManagerType, ResourceConfig> resourceConfigs = new
HashMap<>();
+ private final Map<ResourceJobManagerType, ResourceConfig> resourceConfigs
= new HashMap<>();
private long emailExpirationTimeMinutes;
private String publisherId;
@@ -84,7 +82,6 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
}
properties = new Properties();
properties.put("mail.store.protocol", storeProtocol);
- long period = 1000 * 60 * 5; // five minute delay between successive
task executions.
}
private void loadContext() throws Exception {
@@ -184,16 +181,16 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
@Override
public void run() {
- while (!stopMonitoring && !ServerSettings.isStopAllThreads()) {
+ while (!ServerSettings.isStopAllThreads()) {
try {
- session = Session.getDefaultInstance(properties);
+ Session session = Session.getDefaultInstance(properties);
store = session.getStore(storeProtocol);
store.connect(host, emailAddress, password);
emailFolder = store.getFolder(folderName);
- // first time we search for all unread messages.
+ // first we search for all unread messages.
SearchTerm unseenBefore = new FlagTerm(new
Flags(Flags.Flag.SEEN), false);
- while (!(stopMonitoring || ServerSettings.isStopAllThreads()))
{
- Thread.sleep(ServerSettings.getEmailMonitorPeriod()); //
sleep a bit - get a rest till job finishes
+ while (!ServerSettings.isStopAllThreads()) {
+ Thread.sleep(ServerSettings.getEmailMonitorPeriod()); //
sleep for long enough
if (!store.isConnected()) {
store.connect();
emailFolder = store.getFolder(folderName);
@@ -221,7 +218,7 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
if (searchMessages == null || searchMessages.length ==
0) {
log.info("[EJM]: No new email messages");
} else {
- log.info("[EJM]: " + searchMessages.length + " new
email/s received");
+ log.info("[EJM]: {} new email/s received",
searchMessages.length);
processMessages(searchMessages);
}
emailFolder.close(false);
@@ -257,21 +254,21 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
List<Message> processedMessages = new ArrayList<>();
List<Message> unreadMessages = new ArrayList<>();
for (Message message : searchMessages) {
+ var msgHash = message.hashCode();
try {
- log.info("Received Job Status [{}]: {}", publisherId, message);
JobStatusResult jobStatusResult = parse(message, publisherId);
+ log.info("read JobStatusUpdate<{}> from {}: {}", msgHash,
publisherId, jobStatusResult);
submitJobStatus(jobStatusResult);
- log.info("Submitted the job {} status to queue",
jobStatusResult.getJobId());
processedMessages.add(message);
} catch (Exception e) {
- log.error("Error in submitting job status to queue", e);
- if ((System.currentTimeMillis() -
message.getReceivedDate().getTime())
- > emailExpirationTimeMinutes * 60 * 1000) {
- log.warn("Marking job status email as read as it was
expired");
+ var msgTime = message.getReceivedDate().getTime();
+ var msgExpiryTime =
+ msgTime +
Duration.ofMinutes(emailExpirationTimeMinutes).toMillis();
+ if (System.currentTimeMillis() > msgExpiryTime) {
processedMessages.add(message);
+ log.error("cannot read JobStatusUpdate<{}> from {}. marked
as timeout", msgHash, publisherId, e);
} else {
- log.warn("Keeping job status email as unread untill it is
expired in " + emailExpirationTimeMinutes
- + " minutes. Email received time " +
message.getReceivedDate());
+ log.error("cannot read JobStatusUpdate<{}> from {}. marked
as requeue", msgHash, publisherId, e);
unreadMessages.add(message);
}
}
@@ -294,13 +291,12 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
try {
emailFolder.setFlags(unseenMessages, new
Flags(Flags.Flag.SEEN), false);
} catch (MessagingException e) {
+ // anyway we need to push this update.
if (!store.isConnected()) {
store.connect();
emailFolder.setFlags(unseenMessages, new
Flags(Flags.Flag.SEEN), false);
- flushUnseenMessages = unseenMessages; // anyway we need to
push this update.
- } else {
- flushUnseenMessages = unseenMessages; // anyway we need to
push this update.
}
+ flushUnseenMessages = unseenMessages;
}
}
}
@@ -311,7 +307,7 @@ public class EmailBasedMonitor extends AbstractMonitor
implements Runnable {
t.join();
}
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
EmailBasedMonitor monitor = new EmailBasedMonitor();
monitor.startServer();
}