This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d482fe0f2 refine monitor codes
1d482fe0f2 is described below

commit 1d482fe0f2081de45cdf3d9fb6c633002a40b7fd
Author: yasithdev <[email protected]>
AuthorDate: Tue Jul 15 01:37:22 2025 -0500

    refine monitor codes
---
 .../helix/impl/workflow/PostWorkflowManager.java   | 193 ++++++++-------------
 .../apache/airavata/monitor/AbstractMonitor.java   |  35 ++--
 .../airavata/monitor/email/EmailBasedMonitor.java  |  46 +++--
 3 files changed, 110 insertions(+), 164 deletions(-)

diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 8fe307cbae..a0dc3be935 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -19,19 +19,8 @@
 */
 package org.apache.airavata.helix.impl.workflow;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.*;
+import java.util.concurrent.*;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
@@ -61,12 +50,7 @@ import 
org.apache.airavata.monitor.kafka.JobStatusResultDeserializer;
 import org.apache.airavata.patform.monitoring.CountMonitor;
 import org.apache.airavata.patform.monitoring.MonitoringServer;
 import org.apache.airavata.registry.api.RegistryService;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
@@ -77,7 +61,7 @@ public class PostWorkflowManager extends WorkflowManager {
     private static final Logger logger = 
LoggerFactory.getLogger(PostWorkflowManager.class);
     private static final CountMonitor postwfCounter = new 
CountMonitor("post_wf_counter");
 
-    private ExecutorService processingPool = Executors.newFixedThreadPool(10);
+    private final ExecutorService processingPool = 
Executors.newFixedThreadPool(10);
 
     public PostWorkflowManager() throws ApplicationSettingsException {
         super(
@@ -85,6 +69,21 @@ public class PostWorkflowManager extends WorkflowManager {
                 
Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters")));
     }
 
+    public static void main(String[] args) throws Exception {
+
+        if 
(ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) {
+            MonitoringServer monitoringServer = new MonitoringServer(
+                    
ServerSettings.getSetting("post.workflow.manager.monitoring.host"),
+                    
ServerSettings.getIntSetting("post.workflow.manager.monitoring.port"));
+            monitoringServer.start();
+
+            Runtime.getRuntime().addShutdownHook(new 
Thread(monitoringServer::stop));
+        }
+
+        PostWorkflowManager postManager = new PostWorkflowManager();
+        postManager.startServer();
+    }
+
     private void init() throws Exception {
         super.initComponents();
     }
@@ -113,115 +112,89 @@ public class PostWorkflowManager extends WorkflowManager 
{
 
         RegistryService.Client registryClient = 
getRegistryClientPool().getResource();
 
-        try {
-            logger.info("Processing job result of job id " + 
jobStatusResult.getJobId() + " sent by "
-                    + jobStatusResult.getPublisherName());
-
-            List<JobModel> jobs = registryClient.getJobs("jobId", 
jobStatusResult.getJobId());
+        var jobId = jobStatusResult.getJobId();
+        var jobName = jobStatusResult.getJobName();
+        var jobState = jobStatusResult.getState();
+        var publisherId = jobStatusResult.getPublisherName();
+        logger.info("processing JobStatusUpdate<{}> from {}: {}", jobId, 
publisherId, jobStatusResult);
 
-            if (jobs.size() > 0) {
-                logger.info("Filtering total " + jobs.size() + " with target 
job name " + jobStatusResult.getJobName());
+        try {
+            List<JobModel> jobs = registryClient.getJobs("jobId", jobId);
+            logger.info("Found {} jobs in registry with id={}", jobs.size(), 
jobId);
+            if (!jobs.isEmpty()) {
                 jobs = jobs.stream()
-                        .filter(jm -> 
jm.getJobName().equals(jobStatusResult.getJobName()))
-                        .collect(Collectors.toList());
+                        .filter(jm -> jm.getJobName().equals(jobName))
+                        .toList();
+                logger.info("Found {} jobs in registry with id={} and 
name={}", jobs.size(), jobId, jobName);
             }
-
             if (jobs.size() != 1) {
-                logger.error("Couldn't find exactly one job with id " + 
jobStatusResult.getJobId() + " and name "
-                        + jobStatusResult.getJobName() + " in the registry. 
Count " + jobs.size());
+                logger.error("Found {} job(s) in registry with id={} and 
name={}", jobs.size(), jobId, jobName);
                 getRegistryClientPool().returnResource(registryClient);
                 return false;
             }
-
             JobModel jobModel = jobs.get(0);
             ProcessModel processModel = 
registryClient.getProcess(jobModel.getProcessId());
             ExperimentModel experimentModel = 
registryClient.getExperiment(processModel.getExperimentId());
             ProcessStatus processStatus = 
registryClient.getProcessStatus(processModel.getProcessId());
 
+            var processState = processStatus.getState();
             getRegistryClientPool().returnResource(registryClient);
 
-            if (processModel != null && experimentModel != null) {
-
-                jobModel.getJobStatuses()
-                        
.sort(Comparator.comparingLong(JobStatus::getTimeOfStateChange)
-                                .reversed());
+            if (experimentModel != null) {
+                
jobModel.getJobStatuses().sort(Comparator.comparingLong(JobStatus::getTimeOfStateChange).reversed());
                 JobState currentJobStatus = 
jobModel.getJobStatuses().get(0).getJobState();
+                logger.info("Last known state of job {} is {}", jobId, 
jobName);
 
-                logger.info("Last known state of job " + jobModel.getJobId() + 
" is " + currentJobStatus.name());
-
-                if (!JobStateValidator.isValid(currentJobStatus, 
jobStatusResult.getState())) {
-                    logger.warn("Job state of " + jobStatusResult.getJobId() + 
" is not valid. Previous state "
-                            + currentJobStatus + ", new state " + 
jobStatusResult.getState());
+                if (!JobStateValidator.isValid(currentJobStatus, jobState)) {
+                    logger.warn("JobStatusUpdate<{}> invalid. prev={} -> 
new={}", jobId, currentJobStatus, jobState);
                     return true;
                 }
 
-                String gateway = experimentModel.getGatewayId();
+                String task = jobModel.getTaskId();
                 String processId = processModel.getProcessId();
+                String gateway = experimentModel.getGatewayId();
                 String experimentId = experimentModel.getExperimentId();
-                String task = jobModel.getTaskId();
-
-                logger.info("Updating the job status for job id : " + 
jobStatusResult.getJobId() + " with process id "
-                        + processId + ", exp id " + experimentId + ", gateway 
" + gateway + " and status "
-                        + jobStatusResult.getState().name());
 
-                saveAndPublishJobStatus(
-                        jobStatusResult.getJobId(), task, processId, 
experimentId, gateway, jobStatusResult.getState());
+                logger.info("saving JobStatusUpdate<{}>: pid={}, eid={}, 
gw={}, state={}",
+                        jobId, processId, experimentId, gateway, jobState);
+                saveAndPublishJobStatus(jobId, task, processId, experimentId, 
gateway, jobState);
 
                 // TODO get cluster lock before that
-                if (ProcessState.CANCELLING.equals(processStatus.getState())
-                        || 
ProcessState.CANCELED.equals(processStatus.getState())) {
-                    logger.info("Cancelled post workflow for process " + 
processId + " in experiment " + experimentId);
-                    // This will mark an cancelling Experiment into a 
cancelled status for a set of valid job statuses
+                if (ProcessState.CANCELLING.equals(processState) || 
ProcessState.CANCELED.equals(processState)) {
+                    logger.info("Cancelled post workflow for process {} in 
experiment {}", processId, experimentId);
+                    // This will mark a canceling Experiment with CANCELED 
status for a set of valid job statuses
                     // This is a safety check. Cancellation is originally 
handled in Job Cancellation Workflow
-                    switch (jobStatusResult.getState()) {
+                    switch (jobState) {
                         case FAILED:
                         case SUSPENDED:
                         case CANCELED:
                         case COMPLETE:
-                            logger.info("Job " + jobStatusResult.getJobId() + 
" status is " + jobStatusResult.getState()
-                                    + " so marking experiment " + experimentId 
+ " as cancelled");
+                            logger.info("canceled job={}: eid={}, state={}", 
jobId, experimentId, jobState);
                             publishProcessStatus(processId, experimentId, 
gateway, ProcessState.CANCELED);
                             break;
                         default:
-                            logger.warn("Job " + jobStatusResult.getJobId() + 
" status " + jobStatusResult.getState()
-                                    + " is invalid to mark experiment " + 
experimentId + " as cancelled");
+                            logger.warn("skipping job={}: eid={}, state={}", 
jobId, experimentId, jobState);
                     }
                 } else {
-
-                    if (jobStatusResult.getState() == JobState.COMPLETE
-                            || jobStatusResult.getState() == JobState.FAILED) {
-                        // if the job is FAILED, still run output staging 
tasks to debug the reason for failure. And
-                        // update
-                        // the experiment status as COMPLETED as this job 
failure is not related to Airavata scope.
-                        logger.info("Starting the post workflow for job id : " 
+ jobStatusResult.getJobId()
-                                + " with process id " + processId + ", gateway 
" + gateway + " and status "
-                                + jobStatusResult.getState().name());
-
-                        logger.info("Job " + jobStatusResult.getJobId() + " 
was completed");
-
+                    logger.info("Job {} is in state={}", jobId, jobState);
+                    if (jobState == JobState.COMPLETE || jobState == 
JobState.FAILED) {
+                        // If Job has FAILED, still run output staging tasks 
to debug the reason for failure. And
+                        // update the experiment status as COMPLETED as job 
failures are unrelated to Airavata scope.
+                        logger.info("running PostWorkflow for process {} of 
experiment {}", processId, experimentId);
                         executePostWorkflow(processId, gateway, false);
 
                     } else if (jobStatusResult.getState() == 
JobState.CANCELED) {
-                        logger.info("Job " + jobStatusResult.getJobId()
-                                + " was externally cancelled but process is 
not marked as cancelled yet");
+                        logger.info("Setting process {} of experiment {} to 
state=CANCELED", processId, experimentId);
                         publishProcessStatus(processId, experimentId, gateway, 
ProcessState.CANCELED);
-                        logger.info("Marked process " + processId + " of 
experiment " + experimentId
-                                + " as cancelled as job is already being 
cancelled");
-
-                    } else if (jobStatusResult.getState() == 
JobState.SUBMITTED) {
-                        logger.info("Job " + jobStatusResult.getJobId() + " 
was submitted");
                     }
                 }
                 return true;
             } else {
-                logger.warn("Could not find a monitoring register for job id " 
+ jobStatusResult.getJobId());
+                logger.warn("Could not find a monitoring register for job id 
{}", jobId);
                 return false;
             }
         } catch (Exception e) {
-            logger.error(
-                    "Failed to process job : " + jobStatusResult.getJobId() + 
", with status : "
-                            + jobStatusResult.getState().name(),
-                    e);
+            logger.error("Failed to process job: {}, with status : {}", 
jobStatusResult.getJobId(), jobStatusResult.getState().name(), e);
             getRegistryClientPool().returnBrokenResource(registryClient);
             return false;
         }
@@ -237,21 +210,21 @@ public class PostWorkflowManager extends WorkflowManager {
         HelixTaskFactory taskFactory;
         try {
             processModel = registryClient.getProcess(processId);
-            experimentModel = 
registryClient.getExperiment(processModel.getExperimentId());
-            getRegistryClientPool().returnResource(registryClient);
-            ResourceType resourceType = registryClient
-                    .getGroupComputeResourcePreference(
-                            processModel.getComputeResourceId(), 
processModel.getGroupResourceProfileId())
-                    .getResourceType();
+            var experimentId = processModel.getExperimentId();
+            var crId = processModel.getComputeResourceId();
+            var grpId = processModel.getGroupResourceProfileId();
+
+            experimentModel = registryClient.getExperiment(experimentId);
+            ResourceType resourceType = 
registryClient.getGroupComputeResourcePreference(crId, grpId).getResourceType();
+
             taskFactory = TaskFactory.getFactory(resourceType);
             logger.info("Initialized task factory for resource type {} for 
process {}", resourceType, processId);
 
         } catch (Exception e) {
-            logger.error(
-                    "Failed to fetch experiment or process from registry 
associated with process id " + processId, e);
+            logger.error("Failed to fetch experiment/process from registry for 
pid={}", processId, e);
+            throw new Exception("Failed to fetch experiment/process from 
registry for pid=" + processId, e);
+        } finally {
             getRegistryClientPool().returnResource(registryClient);
-            throw new Exception(
-                    "Failed to fetch experiment or process from registry 
associated with process id " + processId, e);
         }
 
         String taskDag = processModel.getTaskDag();
@@ -264,8 +237,7 @@ public class PostWorkflowManager extends WorkflowManager {
         jobVerificationTask.setGatewayId(experimentModel.getGatewayId());
         jobVerificationTask.setExperimentId(experimentModel.getExperimentId());
         jobVerificationTask.setProcessId(processModel.getProcessId());
-        jobVerificationTask.setTaskId(
-                "Job-Verification-Task-" + UUID.randomUUID().toString() + "-");
+        jobVerificationTask.setTaskId("Job-Verification-Task-" + 
UUID.randomUUID() + "-");
         jobVerificationTask.setForceRunTask(forceRun);
         jobVerificationTask.setSkipAllStatusPublish(true);
 
@@ -320,7 +292,7 @@ public class PostWorkflowManager extends WorkflowManager {
         completingTask.setGatewayId(experimentModel.getGatewayId());
         completingTask.setExperimentId(experimentModel.getExperimentId());
         completingTask.setProcessId(processModel.getProcessId());
-        completingTask.setTaskId("Completing-Task-" + 
UUID.randomUUID().toString() + "-");
+        completingTask.setTaskId("Completing-Task-" + UUID.randomUUID() + "-");
         completingTask.setForceRunTask(forceRun);
         completingTask.setSkipAllStatusPublish(true);
         if (allTasks.size() > 0) {
@@ -341,8 +313,7 @@ public class PostWorkflowManager extends WorkflowManager {
         allTasks.add(parsingTriggeringTask);
 
         String workflowName = getWorkflowOperator()
-                .launchWorkflow(
-                        processId + "-POST-" + UUID.randomUUID().toString(), 
new ArrayList<>(allTasks), true, false);
+                .launchWorkflow(processId + "-POST-" + UUID.randomUUID(), new 
ArrayList<>(allTasks), true, false);
 
         registerWorkflowForProcess(processId, workflowName, "POST");
     }
@@ -353,7 +324,6 @@ public class PostWorkflowManager extends WorkflowManager {
         final Consumer<String, JobStatusResult> consumer = createConsumer();
         new Thread(() -> {
                     while (true) {
-
                         final ConsumerRecords<String, JobStatusResult> 
consumerRecords = consumer.poll(Long.MAX_VALUE);
                         CompletionService<Boolean> executorCompletionService =
                                 new 
ExecutorCompletionService<>(processingPool);
@@ -370,13 +340,11 @@ public class PostWorkflowManager extends WorkflowManager {
                                         record.value().getJobId());
 
                                 // This avoids kafka read thread to wait until 
processing is completed before committing
-                                // There is a risk of missing 20 messages in 
case of a restart but this improves the
-                                // robustness
-                                // of the kafka read thread by avoiding wait 
timeouts
+                                // There is a risk of missing 20 messages in 
case of a restart, but this improves the
+                                // robustness of the kafka read thread by 
avoiding wait timeouts
                                 
processingFutures.add(executorCompletionService.submit(() -> {
                                     boolean success = process(record.value());
-                                    logger.info("Status of processing "
-                                            + record.value().getJobId() + " : 
" + success);
+                                    logger.info("Status of processing {} : 
{}", record.value().getJobId(), success);
                                     return success;
                                 }));
 
@@ -440,19 +408,4 @@ public class PostWorkflowManager extends WorkflowManager {
     }
 
     public void stopServer() {}
-
-    public static void main(String[] args) throws Exception {
-
-        if 
(ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) {
-            MonitoringServer monitoringServer = new MonitoringServer(
-                    
ServerSettings.getSetting("post.workflow.manager.monitoring.host"),
-                    
ServerSettings.getIntSetting("post.workflow.manager.monitoring.port"));
-            monitoringServer.start();
-
-            Runtime.getRuntime().addShutdownHook(new 
Thread(monitoringServer::stop));
-        }
-
-        PostWorkflowManager postManager = new PostWorkflowManager();
-        postManager.startServer();
-    }
 }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java 
b/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
index e569a29be3..ce29676cc8 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
@@ -21,7 +21,6 @@ package org.apache.airavata.monitor;
 
 import java.time.Duration;
 import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftClientPool;
@@ -29,10 +28,6 @@ import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.monitor.kafka.MessageProducer;
 import org.apache.airavata.registry.api.RegistryService;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,14 +35,10 @@ public class AbstractMonitor {
 
     private static final Logger log = 
LoggerFactory.getLogger(AbstractMonitor.class);
 
-    private MessageProducer messageProducer;
-    private CuratorFramework curatorClient;
+    private final MessageProducer messageProducer;
     private ThriftClientPool<RegistryService.Client> registryClientPool;
 
     public AbstractMonitor() throws ApplicationSettingsException {
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        this.curatorClient = 
CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), 
retryPolicy);
-        this.curatorClient.start();
         this.initRegistryClientPool();
         messageProducer = new MessageProducer();
     }
@@ -79,16 +70,19 @@ public class AbstractMonitor {
             log.info("Fetching matching jobs for job id {} from registry", 
jobStatusResult.getJobId());
             List<JobModel> jobs = registryClient.getJobs("jobId", 
jobStatusResult.getJobId());
 
-            if (jobs.size() > 0) {
-                log.info("Filtering total " + jobs.size() + " with target job 
name " + jobStatusResult.getJobName());
+            if (!jobs.isEmpty()) {
+                log.info("Filtering total {} with target job name {}", 
jobs.size(), jobStatusResult.getJobName());
                 jobs = jobs.stream()
                         .filter(jm -> 
jm.getJobName().equals(jobStatusResult.getJobName()))
-                        .collect(Collectors.toList());
+                        .toList();
             }
 
             if (jobs.size() != 1) {
-                log.error("Couldn't find exactly one job with id " + 
jobStatusResult.getJobId() + " and name "
-                        + jobStatusResult.getJobName() + " in the registry. 
Count " + jobs.size());
+                log.error(
+                        "Couldn't find exactly one job with id {} and name {} 
in the registry. Count {}",
+                        jobStatusResult.getJobId(),
+                        jobStatusResult.getJobName(),
+                        jobs.size());
                 validated = false;
 
             } else {
@@ -98,11 +92,14 @@ public class AbstractMonitor {
                 String experimentId = 
registryClient.getProcess(processId).getExperimentId();
 
                 if (experimentId != null && processId != null) {
-                    log.info("Job id " + jobStatusResult.getJobId() + " is 
owned by process " + processId
-                            + " of experiment " + experimentId);
+                    log.info(
+                            "Job id {} is owned by process {} of experiment 
{}",
+                            jobStatusResult.getJobId(),
+                            processId,
+                            experimentId);
                     validated = true;
                 } else {
-                    log.error("Experiment or process is null for job " + 
jobStatusResult.getJobId());
+                    log.error("Experiment or process is null for job {}", 
jobStatusResult.getJobId());
                     validated = false;
                 }
             }
@@ -110,7 +107,7 @@ public class AbstractMonitor {
             return validated;
 
         } catch (Exception e) {
-            log.error("Error at validating job status " + 
jobStatusResult.getJobId(), e);
+            log.error("Error at validating job status {}", 
jobStatusResult.getJobId(), e);
             getRegistryClientPool().returnBrokenResource(registryClient);
             return false;
         }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
 
b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
index 7e36df285d..1bfb591e56 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
@@ -29,6 +29,7 @@ import jakarta.mail.Store;
 import jakarta.mail.search.FlagTerm;
 import jakarta.mail.search.SearchTerm;
 import java.io.InputStream;
+import java.time.Duration;
 import java.util.*;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ApplicationSettings;
@@ -50,17 +51,14 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
     private static final String IMAPS = "imaps";
     private static final String POP3 = "pop3";
 
-    private boolean stopMonitoring = false;
-    private Session session;
     private Store store;
     private Folder emailFolder;
     private Properties properties;
     private String host, emailAddress, password, storeProtocol, folderName;
-    private Map<ResourceJobManagerType, EmailParser> emailParserMap =
-            new HashMap<ResourceJobManagerType, EmailParser>();
-    private Map<String, ResourceJobManagerType> addressMap = new HashMap<>();
+    private final Map<ResourceJobManagerType, EmailParser> emailParserMap = 
new HashMap<>();
+    private final Map<String, ResourceJobManagerType> addressMap = new 
HashMap<>();
     private Message[] flushUnseenMessages;
-    private Map<ResourceJobManagerType, ResourceConfig> resourceConfigs = new 
HashMap<>();
+    private final Map<ResourceJobManagerType, ResourceConfig> resourceConfigs 
= new HashMap<>();
     private long emailExpirationTimeMinutes;
     private String publisherId;
 
@@ -84,7 +82,6 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
         }
         properties = new Properties();
         properties.put("mail.store.protocol", storeProtocol);
-        long period = 1000 * 60 * 5; // five minute delay between successive 
task executions.
     }
 
     private void loadContext() throws Exception {
@@ -184,16 +181,16 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
     @Override
     public void run() {
 
-        while (!stopMonitoring && !ServerSettings.isStopAllThreads()) {
+        while (!ServerSettings.isStopAllThreads()) {
             try {
-                session = Session.getDefaultInstance(properties);
+                Session session = Session.getDefaultInstance(properties);
                 store = session.getStore(storeProtocol);
                 store.connect(host, emailAddress, password);
                 emailFolder = store.getFolder(folderName);
-                // first time we search for all unread messages.
+                // first we search for all unread messages.
                 SearchTerm unseenBefore = new FlagTerm(new 
Flags(Flags.Flag.SEEN), false);
-                while (!(stopMonitoring || ServerSettings.isStopAllThreads())) 
{
-                    Thread.sleep(ServerSettings.getEmailMonitorPeriod()); // 
sleep a bit - get a rest till job finishes
+                while (!ServerSettings.isStopAllThreads()) {
+                    Thread.sleep(ServerSettings.getEmailMonitorPeriod()); // 
sleep for long enough
                     if (!store.isConnected()) {
                         store.connect();
                         emailFolder = store.getFolder(folderName);
@@ -221,7 +218,7 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
                         if (searchMessages == null || searchMessages.length == 
0) {
                             log.info("[EJM]: No new email messages");
                         } else {
-                            log.info("[EJM]: " + searchMessages.length + " new 
email/s received");
+                            log.info("[EJM]: {} new email/s received", 
searchMessages.length);
                             processMessages(searchMessages);
                         }
                         emailFolder.close(false);
@@ -257,21 +254,21 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
         List<Message> processedMessages = new ArrayList<>();
         List<Message> unreadMessages = new ArrayList<>();
         for (Message message : searchMessages) {
+            var msgHash = message.hashCode();
             try {
-                log.info("Received Job Status [{}]: {}", publisherId, message);
                 JobStatusResult jobStatusResult = parse(message, publisherId);
+                log.info("read JobStatusUpdate<{}> from {}: {}", msgHash, 
publisherId, jobStatusResult);
                 submitJobStatus(jobStatusResult);
-                log.info("Submitted the job {} status to queue", 
jobStatusResult.getJobId());
                 processedMessages.add(message);
             } catch (Exception e) {
-                log.error("Error in submitting job status to queue", e);
-                if ((System.currentTimeMillis() - 
message.getReceivedDate().getTime())
-                        > emailExpirationTimeMinutes * 60 * 1000) {
-                    log.warn("Marking job status email as read as it was 
expired");
+                var msgTime = message.getReceivedDate().getTime();
+                var msgExpiryTime =
+                        msgTime + 
Duration.ofMinutes(emailExpirationTimeMinutes).toMillis();
+                if (System.currentTimeMillis() > msgExpiryTime) {
                     processedMessages.add(message);
+                    log.error("cannot read JobStatusUpdate<{}> from {}. marked 
as timeout", msgHash, publisherId, e);
                 } else {
-                    log.warn("Keeping job status email as unread untill it is 
expired in " + emailExpirationTimeMinutes
-                            + " minutes. Email received time " + 
message.getReceivedDate());
+                    log.error("cannot read JobStatusUpdate<{}> from {}. marked 
as requeue", msgHash, publisherId, e);
                     unreadMessages.add(message);
                 }
             }
@@ -294,13 +291,12 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
             try {
                 emailFolder.setFlags(unseenMessages, new 
Flags(Flags.Flag.SEEN), false);
             } catch (MessagingException e) {
+                // anyway we need to push this update.
                 if (!store.isConnected()) {
                     store.connect();
                     emailFolder.setFlags(unseenMessages, new 
Flags(Flags.Flag.SEEN), false);
-                    flushUnseenMessages = unseenMessages; // anyway we need to 
push this update.
-                } else {
-                    flushUnseenMessages = unseenMessages; // anyway we need to 
push this update.
                 }
+                flushUnseenMessages = unseenMessages;
             }
         }
     }
@@ -311,7 +307,7 @@ public class EmailBasedMonitor extends AbstractMonitor 
implements Runnable {
         t.join();
     }
 
-    public static void main(String args[]) throws Exception {
+    public static void main(String[] args) throws Exception {
         EmailBasedMonitor monitor = new EmailBasedMonitor();
         monitor.startServer();
     }

Reply via email to