This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch merge-svcs
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 7749cb85f06368f1f59f45ae2c77a82e55829fa8
Author: yasithdev <[email protected]>
AuthorDate: Tue Aug 19 18:22:04 2025 -0400

    add correct service name to threads. fix bugs, add missing resources and 
properties.
---
 .devcontainer/docker-compose-alt.yml               |   2 +-
 .../src/main/assembly/api-server-bin-assembly.xml  |   1 +
 .../src/main/java/org/apache/airavata/Main.java    |  17 +++
 .../org/apache/airavata/api/AiravataAPIServer.java |   4 +-
 .../airavata/common/utils/ServerSettings.java      |   5 +
 .../db/event/manager/DBEventManagerRunner.java     |   2 +-
 .../helix/core/participant/HelixParticipant.java   |  23 ++--
 .../helix/impl/controller/HelixController.java     |   2 +-
 .../helix/impl/workflow/PostWorkflowManager.java   | 139 ++++++++++----------
 .../helix/impl/workflow/PreWorkflowManager.java    |  17 ++-
 .../airavata/monitor/email/EmailBasedMonitor.java  | 119 +++++++++--------
 .../monitor/platform/MonitoringServer.java         |   6 +-
 .../airavata/monitor/realtime/RealtimeMonitor.java |  49 ++++---
 .../src/main/resources/airavata-server.properties  |   1 +
 .../src/main/resources/distribution/bin/setenv.sh  | 146 +++++++++++++++++++++
 .../{email-config.yaml => email-config.yml}        |   0
 airavata-api/src/main/resources/log4j2.xml         |   1 +
 .../templates/airavata-server.properties.j2        |   1 +
 .../participant/airavata-server.properties.j2      |   2 +
 .../deployment-scripts/airavata-server.properties  |   1 +
 20 files changed, 376 insertions(+), 162 deletions(-)

diff --git a/.devcontainer/docker-compose-alt.yml 
b/.devcontainer/docker-compose-alt.yml
index ddf80cb50e..8928180c3d 100644
--- a/.devcontainer/docker-compose-alt.yml
+++ b/.devcontainer/docker-compose-alt.yml
@@ -159,7 +159,6 @@ services:
       - zookeeper.server.connection=zookeeper:2181
       - api.server.host=apiserver
       - rabbitmq.broker.url=amqp://guest:guest@rabbitmq:5672/develop
-      - email.based.monitor.address=CHANGEME
       - realtime.monitor.enabled=false
     command: ["/tmp/wait-for-it.sh", "zookeeper:2181", "--", 
"/tmp/wait-for-it.sh", "apiserver:8970", "--" , "/tmp/wait-for-it.sh", 
"rabbitmq:5672", "--", "/opt/apache-airavata-participant/bin/participant.sh"]
 
@@ -177,6 +176,7 @@ services:
       - zookeeper.server.connection=zookeeper:2181
       - api.server.host=apiserver
       - api.server.port=8970
+      - email.based.monitor.host=CHANGEME
       - email.based.monitor.address=CHANGEME
       - email.based.monitor.password=CHANGEME
       - kafka.broker.url=kafka:9092
diff --git a/airavata-api/src/main/assembly/api-server-bin-assembly.xml 
b/airavata-api/src/main/assembly/api-server-bin-assembly.xml
index a4ba2f2ae6..9bfea07815 100644
--- a/airavata-api/src/main/assembly/api-server-bin-assembly.xml
+++ b/airavata-api/src/main/assembly/api-server-bin-assembly.xml
@@ -62,6 +62,7 @@
         <include>templates/*.template</include>
         <include>*.properties</include>
         <include>*.xml</include>
+        <include>*.yml</include>
       </includes>
     </fileSet>
 
diff --git a/airavata-api/src/main/java/org/apache/airavata/Main.java 
b/airavata-api/src/main/java/org/apache/airavata/Main.java
index b843b29069..493270b0ee 100644
--- a/airavata-api/src/main/java/org/apache/airavata/Main.java
+++ b/airavata-api/src/main/java/org/apache/airavata/Main.java
@@ -20,6 +20,7 @@
 package org.apache.airavata;
 
 import org.apache.airavata.api.AiravataAPIServer;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.db.event.manager.DBEventManagerRunner;
 import org.apache.airavata.helix.impl.controller.HelixController;
 import org.apache.airavata.helix.impl.participant.GlobalParticipant;
@@ -27,6 +28,7 @@ import 
org.apache.airavata.helix.impl.workflow.PostWorkflowManager;
 import org.apache.airavata.helix.impl.workflow.PreWorkflowManager;
 import org.apache.airavata.monitor.cluster.ClusterStatusMonitorJobScheduler;
 import org.apache.airavata.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.monitor.platform.MonitoringServer;
 import org.apache.airavata.monitor.realtime.RealtimeMonitor;
 
 public class Main {
@@ -69,5 +71,20 @@ public class Main {
         var jobScheduler = new ClusterStatusMonitorJobScheduler();
         assert jobScheduler != null;
         // jobScheduler.scheduleClusterStatusMonitoring();
+
+        if 
(ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) {
+            var monitoringServer = new MonitoringServer(
+                    
ServerSettings.getSetting("post.workflow.manager.monitoring.host"),
+                    
ServerSettings.getIntSetting("post.workflow.manager.monitoring.port"));
+            monitoringServer.start();
+            Runtime.getRuntime().addShutdownHook(new 
Thread(monitoringServer::stop));
+        }
+
+        try {
+          Thread.currentThread().join();
+        } catch (InterruptedException ex) {
+          System.out.println("Main thread is interrupted! reason: " + ex);
+          ServerSettings.setStopAllThreads(true);
+        }
     }
 }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java 
b/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java
index f695be9a80..c6fcc84a02 100644
--- a/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java
+++ b/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java
@@ -161,7 +161,7 @@ public class AiravataAPIServer implements IServer {
                 server.serve();
                 setStatus(ServerStatus.STOPPED);
                 logger.info("Airavata Thrift API Stopped.");
-            }).start();
+            }, this.getClass().getSimpleName()).start();
 
             // Monitor server startup
             new Thread(() -> {
@@ -180,7 +180,7 @@ public class AiravataAPIServer implements IServer {
                         ServiceName.SHARING_REGISTRY.toString(), 
ServiceName.ORCHESTRATOR.toString(), ServiceName.USER_PROFILE.toString(),
                         ServiceName.TENANT_PROFILE.toString(), 
ServiceName.IAM_ADMIN_SERVICES.toString(), 
ServiceName.GROUP_MANAGER.toString());
                 }
-            }).start();
+            }, this.getClass().getSimpleName() + ".Monitor").start();
 
         } catch (TTransportException e) {
             logger.error("Failed to start Airavata Thrift API", e);
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
 
b/airavata-api/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 8ddc3f7ba6..9cc774b10e 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -57,6 +57,7 @@ public class ServerSettings extends ApplicationSettings {
 
     // email-based monitoring configurations
     private static final String EMAIL_BASED_MONITORING_PERIOD = 
"email.based.monitoring.period";
+    private static final String EMAIL_BASED_MONITOR_HOST = 
"email.based.monitor.host";
     private static final String EMAIL_BASED_MONITOR_ADDRESS = 
"email.based.monitor.address";
     private static final String EMAIL_BASED_MONITOR_PASSWORD = 
"email.based.monitor.password";
     private static final String EMAIL_BASED_MONITOR_FOLDER_NAME = 
"email.based.monitor.folder.name";
@@ -177,6 +178,10 @@ public class ServerSettings extends ApplicationSettings {
         return Integer.parseInt(getSetting(EMAIL_BASED_MONITORING_PERIOD, 
"100000"));
     }
 
+    public static String getEmailBasedMonitorHost() throws 
ApplicationSettingsException {
+      return getSetting(EMAIL_BASED_MONITOR_HOST);
+  }
+
     public static String getEmailBasedMonitorAddress() throws 
ApplicationSettingsException {
         return getSetting(EMAIL_BASED_MONITOR_ADDRESS);
     }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java
 
b/airavata-api/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java
index a4dd2c83b8..f0c6081c75 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java
@@ -80,7 +80,7 @@ public class DBEventManagerRunner implements IServer {
 
             // start the worker thread
             log.info("Starting the DB Event Manager runner.");
-            new Thread(runner).start();
+            new Thread(runner, this.getClass().getSimpleName()).start();
             setStatus(ServerStatus.STARTED);
         } catch (Exception ex) {
             log.error("Something went wrong with the DB Event Manager runner. 
Error: " + ex, ex);
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
index 93de5213d3..716853d3ca 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
@@ -135,7 +135,14 @@ public class HelixParticipant<T extends AbstractTask> 
implements Runnable {
         return taskRegistry;
     }
 
+    @Override
     public void run() {
+        var thread = new Thread(this::start, this.getClass().getSimpleName());
+        thread.start();
+        Runtime.getRuntime().addShutdownHook(new Thread(thread::interrupt));
+    }
+
+    private void start() {
         ZKHelixAdmin zkHelixAdmin = new 
ZKHelixAdmin.Builder().setZkAddress(zkAddress).build();
         try {
             List<String> nodesInCluster = 
zkHelixAdmin.getInstancesInCluster(clusterName);
@@ -165,7 +172,7 @@ public class HelixParticipant<T extends AbstractTask> 
implements Runnable {
                     logger.warn("Participant: " + participantName + " was not 
disabled normally", e);
                 }
                 disconnect();
-            }));
+            }, this.getClass().getSimpleName() + ".ShutdownHook"));
 
             // connect the participant manager
             connect();
@@ -195,16 +202,12 @@ public class HelixParticipant<T extends AbstractTask> 
implements Runnable {
             zkHelixManager.connect();
             logger.info("Participant: " + participantName + ", has connected 
to cluster: " + clusterName);
 
-            if (ServerSettings.getBooleanSetting("api.monitoring.enabled")) {
-              System.out.println("Starting participant monitoring server 
.......");
-              var monitoringServer = new MonitoringServer(
-                      ServerSettings.getSetting("api.monitoring.host"),
-                      ServerSettings.getIntSetting("api.monitoring.port"));
-              monitoringServer.start();
-              Runtime.getRuntime().addShutdownHook(new 
Thread(monitoringServer::stop));
+            try {
+              Thread.currentThread().join();
+            } catch (InterruptedException ex) {
+              logger.error("Participant: " + participantName + ", is 
interrupted! reason: " + ex, ex);
             }
-            Thread.currentThread().join();
-            
+
         } catch (InterruptedException ex) {
             logger.error("Participant: " + participantName + ", is 
interrupted! reason: " + ex, ex);
         } catch (Exception ex) {
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
index 292e21e3fc..1b9aec7117 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java
@@ -74,7 +74,7 @@ public class HelixController implements Runnable {
     }
 
     public void start() throws Exception {
-        Thread controllerThread = new Thread(this);
+        Thread controllerThread = new Thread(this, 
this.getClass().getSimpleName());
         controllerThread.start();
         Runtime.getRuntime().addShutdownHook(new 
Thread(controllerThread::interrupt));
     }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 18e1e7e32b..2b40c169d5 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -19,6 +19,7 @@
 */
 package org.apache.airavata.helix.impl.workflow;
 
+import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.*;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -48,7 +49,6 @@ import org.apache.airavata.monitor.JobStateValidator;
 import org.apache.airavata.monitor.JobStatusResult;
 import org.apache.airavata.monitor.kafka.JobStatusResultDeserializer;
 import org.apache.airavata.monitor.platform.CountMonitor;
-import org.apache.airavata.monitor.platform.MonitoringServer;
 import org.apache.airavata.registry.api.RegistryService;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -70,19 +70,9 @@ public class PostWorkflowManager extends WorkflowManager {
 
     @Override
     public void run() {
-        try {
-          if 
(ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) {
-              MonitoringServer monitoringServer = new MonitoringServer(
-                      
ServerSettings.getSetting("post.workflow.manager.monitoring.host"),
-                      
ServerSettings.getIntSetting("post.workflow.manager.monitoring.port"));
-              monitoringServer.start();
-
-              Runtime.getRuntime().addShutdownHook(new 
Thread(monitoringServer::stop));
-          }
-          startServer();
-        } catch (Exception e) {
-            logger.error("Error starting PreWorkflowManager", e);
-        }
+        var thread = new Thread(this::startServer, 
this.getClass().getSimpleName());
+        thread.start();
+        Runtime.getRuntime().addShutdownHook(new Thread(thread::interrupt));
     }
 
     private void init() throws Exception {
@@ -328,58 +318,75 @@ public class PostWorkflowManager extends WorkflowManager {
         registerWorkflowForProcess(processId, workflowName, "POST");
     }
 
-    public void startServer() throws Exception {
-
-        init();
-        final Consumer<String, JobStatusResult> consumer = createConsumer();
-        new Thread(() -> {
-                    while (true) {
-                        final ConsumerRecords<String, JobStatusResult> 
consumerRecords = consumer.poll(Long.MAX_VALUE);
-                        var executorCompletionService = new 
ExecutorCompletionService<>(processingPool);
-                        var processingFutures = new ArrayList<>();
-
-                        for (var topicPartition : 
consumerRecords.partitions()) {
-                            var partitionRecords = 
consumerRecords.records(topicPartition);
-                            logger.info("Received job records {}", 
partitionRecords.size());
-
-                            for (var record : partitionRecords) {
-                                var topic = topicPartition.topic();
-                                var partition = topicPartition.partition();
-                                var key = record.key();
-                                var value = record.value();
-                                logger.info("received post on {}/{}: {}->{}", 
topic, partition, key, value);
-                                logger.info(
-                                        "Submitting {} to process in thread 
pool",
-                                        record.value().getJobId());
-
-                                // This avoids kafka read thread to wait until 
processing is completed before committing
-                                // There is a risk of missing 20 messages in 
case of a restart, but this improves the
-                                // robustness of the kafka read thread by 
avoiding wait timeouts
-                                
processingFutures.add(executorCompletionService.submit(() -> {
-                                    boolean success = process(record.value());
-                                    logger.info(
-                                            "Status of processing {} : {}",
-                                            record.value().getJobId(),
-                                            success);
-                                    return success;
-                                }));
-
-                                consumer.commitSync(Collections.singletonMap(
-                                        topicPartition, new 
OffsetAndMetadata(record.offset() + 1)));
-                            }
-                        }
-
-                        for (var f : processingFutures) {
-                            try {
-                                executorCompletionService.take().get();
-                            } catch (Exception e) {
-                                logger.error("Failed processing job", e);
-                            }
-                        }
-                        logger.info("All messages processed. Moving to next 
round");
-                    }
-                })
-                .start();
+    public void startServer() {
+        try {
+          init();
+        } catch (Exception e) {
+          logger.error("Error starting PostWorkflowManager", e);
+        }
+        final Consumer<String, JobStatusResult> consumer;
+        try {
+          consumer = createConsumer();
+        } catch (ApplicationSettingsException e) {
+          logger.error("Error creating consumer", e);
+          return;
+        }
+        try {
+          while (true) {
+              final ConsumerRecords<String, JobStatusResult> consumerRecords = 
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+              var executorCompletionService = new 
ExecutorCompletionService<>(processingPool);
+              var processingFutures = new ArrayList<>();
+      
+              for (var topicPartition : consumerRecords.partitions()) {
+                  var partitionRecords = 
consumerRecords.records(topicPartition);
+                  logger.info("Received job records {}", 
partitionRecords.size());
+      
+                  for (var record : partitionRecords) {
+                      var topic = topicPartition.topic();
+                      var partition = topicPartition.partition();
+                      var key = record.key();
+                      var value = record.value();
+                      logger.info("received post on {}/{}: {}->{}", topic, 
partition, key, value);
+                      logger.info(
+                              "Submitting {} to process in thread pool",
+                              record.value().getJobId());
+      
+                      // This avoids kafka read thread to wait until 
processing is completed before committing
+                      // There is a risk of missing 20 messages in case of a 
restart, but this improves the
+                      // robustness of the kafka read thread by avoiding wait 
timeouts
+                      
processingFutures.add(executorCompletionService.submit(() -> {
+                          boolean success = process(record.value());
+                          logger.info(
+                                  "Status of processing {} : {}",
+                                  record.value().getJobId(),
+                                  success);
+                          return success;
+                      }));
+      
+                      consumer.commitSync(Collections.singletonMap(
+                              topicPartition, new 
OffsetAndMetadata(record.offset() + 1)));
+                  }
+              }
+      
+              for (var f : processingFutures) {
+                  try {
+                      executorCompletionService.take().get();
+                  } catch (Exception e) {
+                      logger.error("Failed processing job", e);
+                  }
+              }
+              logger.info("All messages processed. Moving to next round");
+              
+              if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException("PostWorkflowManager is 
interrupted!");
+              }
+          }
+        } catch (InterruptedException ex) {
+          logger.error("PostWorkflowManager is interrupted! reason: " + ex, 
ex);
+        } finally {
+          consumer.close();
+          processingPool.shutdown();
+        }
     }
 
     private void saveAndPublishJobStatus(
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index b009199229..7636b8c0eb 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -75,9 +75,16 @@ public class PreWorkflowManager extends WorkflowManager {
                 
Boolean.parseBoolean(ServerSettings.getSetting("pre.workflow.manager.loadbalance.clusters")));
     }
 
-    public void startServer() throws Exception {
+    public void startServer() {
+      try {
         super.initComponents();
         initLaunchSubscriber();
+        Thread.currentThread().join();
+      } catch (InterruptedException ex) {
+        logger.error("PreWorkflowManager is interrupted! reason: " + ex, ex);
+      } catch (Exception e) {
+        logger.error("Error starting PreWorkflowManager", e);
+      }
     }
 
     public void stopServer() {}
@@ -284,11 +291,9 @@ public class PreWorkflowManager extends WorkflowManager {
 
     @Override
     public void run() {
-        try {
-            startServer();
-        } catch (Exception e) {
-            logger.error("Error starting PreWorkflowManager", e);
-        }
+        var thread = new Thread(this::startServer, 
this.getClass().getSimpleName());
+        thread.start();
+        Runtime.getRuntime().addShutdownHook(new Thread(thread::interrupt));
     }
 
     private class ProcessLaunchMessageHandler implements MessageHandler {
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
 
b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
index 7d9eec019b..6e8eef4eb4 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
@@ -69,7 +69,7 @@ public class EmailBasedMonitor extends AbstractMonitor {
 
     private void init() throws Exception {
         loadContext();
-        host = ServerSettings.getApiServerHost();
+        host = ServerSettings.getEmailBasedMonitorHost();
         emailAddress = ServerSettings.getEmailBasedMonitorAddress();
         password = ServerSettings.getEmailBasedMonitorPassword();
         storeProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol();
@@ -139,7 +139,7 @@ public class EmailBasedMonitor extends AbstractMonitor {
     }
 
     public void monitor(String jobId) {
-        log.info("[EJM]: Added monitor Id : {} to email based monitor map", 
jobId);
+        log.info("Added monitor Id : {} to email based monitor map", jobId);
     }
 
     private JobStatusResult parse(Message message, String publisherId) throws 
MessagingException, AiravataException {
@@ -148,7 +148,7 @@ public class EmailBasedMonitor extends AbstractMonitor {
         ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr);
         EmailParser emailParser = emailParserMap.get(jobMonitorType);
         if (emailParser == null) {
-            throw new AiravataException("[EJM]: Un-handle resource job manager 
type: " + jobMonitorType.toString()
+            throw new AiravataException("Un-handle resource job manager type: 
" + jobMonitorType.toString()
                     + " for email monitoring -->  " + addressStr);
         }
         RegistryService.Iface registry = getRegistry();
@@ -167,79 +167,90 @@ public class EmailBasedMonitor extends AbstractMonitor {
                 return addressEntry.getValue();
             }
         }
-        throw new AiravataException("[EJM]: Couldn't identify Resource job 
manager type from address " + addressStr);
+        throw new AiravataException("Couldn't identify Resource job manager 
type from address " + addressStr);
     }
 
-    @Override
-    public void run() {
-
+    private void runEmailMonitor() {
+        Session session = null;
+        SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), 
false);
         while (!ServerSettings.isStopAllThreads()) {
             try {
-                Session session = Session.getDefaultInstance(properties);
-                store = session.getStore(storeProtocol);
-                store.connect(host, emailAddress, password);
-                emailFolder = store.getFolder(folderName);
-                // first we search for all unread messages.
-                SearchTerm unseenBefore = new FlagTerm(new 
Flags(Flags.Flag.SEEN), false);
-                while (!ServerSettings.isStopAllThreads()) {
-                    Thread.sleep(ServerSettings.getEmailMonitorPeriod()); // 
sleep for long enough
-                    if (!store.isConnected()) {
-                        store.connect();
-                        emailFolder = store.getFolder(folderName);
-                    }
-                    log.info("[EJM]: Retrieving unseen emails");
-                    if (emailFolder == null) {
-                        return;
-                    }
-                    emailFolder.open(Folder.READ_WRITE);
-                    if (emailFolder.isOpen()) {
-                        // flush if any message left in flushUnseenMessage
-                        if (flushUnseenMessages != null && 
flushUnseenMessages.length > 0) {
-                            try {
+                if (session == null) {
+                    session = Session.getDefaultInstance(properties);
+                }
+                if (store == null || !store.isConnected()) {
+                    store = session.getStore(storeProtocol);
+                    store.connect(host, emailAddress, password);
+                }
+                if (emailFolder == null || !emailFolder.isOpen()) {
+                    emailFolder = store.getFolder(folderName);
+                }
+
+                log.info("Retrieving unseen emails");
+                if (emailFolder == null) {
+                    return;
+                }
+                emailFolder.open(Folder.READ_WRITE);
+                if (emailFolder.isOpen()) {
+                    // flush if any message left in flushUnseenMessage
+                    if (flushUnseenMessages != null && 
flushUnseenMessages.length > 0) {
+                        try {
+                            emailFolder.setFlags(flushUnseenMessages, new 
Flags(Flags.Flag.SEEN), false);
+                            flushUnseenMessages = null;
+                        } catch (MessagingException e) {
+                            if (!store.isConnected()) {
+                                store.connect();
                                 emailFolder.setFlags(flushUnseenMessages, new 
Flags(Flags.Flag.SEEN), false);
                                 flushUnseenMessages = null;
-                            } catch (MessagingException e) {
-                                if (!store.isConnected()) {
-                                    store.connect();
-                                    emailFolder.setFlags(flushUnseenMessages, 
new Flags(Flags.Flag.SEEN), false);
-                                    flushUnseenMessages = null;
-                                }
                             }
                         }
-                        Message[] searchMessages = 
emailFolder.search(unseenBefore);
-                        if (searchMessages == null || searchMessages.length == 
0) {
-                            log.info("[EJM]: No new email messages");
-                        } else {
-                            log.info("[EJM]: {} new email/s received", 
searchMessages.length);
-                            processMessages(searchMessages);
-                        }
-                        emailFolder.close(false);
                     }
+                    Message[] searchMessages = 
emailFolder.search(unseenBefore);
+                    if (searchMessages == null || searchMessages.length == 0) {
+                        log.info("No new email messages");
+                    } else {
+                        log.info("{} new email/s received", 
searchMessages.length);
+                        processMessages(searchMessages);
+                    }
+                    emailFolder.close(false);
+                }
+                if (Thread.currentThread().isInterrupted()) {
+                  throw new InterruptedException("EmailBasedMonitor is 
interrupted!");
                 }
+            } catch (InterruptedException ex) {
+              log.error("EmailBasedMonitor is interrupted! reason: " + ex, ex);
             } catch (MessagingException e) {
-                log.error("[EJM]: Couldn't connect to the store ", e);
-            } catch (InterruptedException e) {
-                log.error("[EJM]: Interrupt exception while sleep ", e);
-            } catch (AiravataException e) {
-                log.error("[EJM]: UnHandled arguments ", e);
+                log.error("Couldn't connect to the store ", e);
             } catch (Throwable e) {
-                log.error("[EJM]: Caught a throwable ", e);
+                log.error("Caught a throwable ", e);
             } finally {
                 try {
-                    if (emailFolder != null) {
+                    if (emailFolder != null && emailFolder.isOpen()) {
                         emailFolder.close(false);
                     }
-                    if (store != null) {
+                    if (store != null && store.isConnected()) {
                         store.close();
                     }
                 } catch (MessagingException e) {
-                    log.error("[EJM]: Store close operation failed, couldn't 
close store", e);
-                } catch (Throwable e) {
-                    log.error("[EJM]: Caught a throwable while closing email 
store ", e);
+                    log.error("Store close operation failed, couldn't close 
store", e);
+                }
+                try {
+                  Thread.sleep(ServerSettings.getEmailMonitorPeriod());
+                } catch (InterruptedException e) {
+                  log.error("interrupted while sleeping ", e);
+                } catch (Exception e) {
+                  log.error("exception thrown when attempting to sleep ", e);
                 }
             }
         }
-        log.info("[EJM]: Email monitoring daemon stopped");
+        log.info("Email monitoring daemon stopped");
+    }
+
+    @Override
+    public void run() {
+        var thread = new Thread(this::runEmailMonitor, 
this.getClass().getSimpleName());
+        thread.start();
+        Runtime.getRuntime().addShutdownHook(new Thread(thread::interrupt));
     }
 
     private void processMessages(Message[] searchMessages) throws 
MessagingException {
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java
 
b/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java
index 755cdc2e91..a1bfca1e42 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java
@@ -37,11 +37,11 @@ public class MonitoringServer {
         this.port = port;
     }
 
-    public void start() throws IOException {
+    public void start() {
         try {
             logger.info("Starting the monitoring server");
             httpServer = new HTTPServer(host, port, true);
-        } catch (IOException e) {
+        } catch (Exception e) {
             logger.error("Failed to start the monitoring server on host {} na 
port {}", host, port, e);
         }
     }
@@ -49,7 +49,7 @@ public class MonitoringServer {
     public void stop() {
         if (httpServer != null) {
             logger.info("Stopping the monitor server");
-            httpServer.stop();
+            httpServer.close();
         }
     }
 }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
 
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
index 9b93ca8b67..5ac34f6c3f 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
@@ -64,20 +64,35 @@ public class RealtimeMonitor extends AbstractMonitor {
         return consumer;
     }
 
-    private void runConsumer() throws ApplicationSettingsException {
-        final Consumer<String, String> consumer = createConsumer();
+    private void runConsumer() {
+        final Consumer<String, String> consumer;
+        try {
+            consumer = createConsumer();
+        } catch (ApplicationSettingsException e) {
+            logger.error("Error while creating consumer", e);
+            return;
+        }
 
-        while (true) {
-            final ConsumerRecords<String, String> consumerRecords = 
consumer.poll(Duration.ofSeconds(1));
-            RegistryService.Iface registry = getRegistry();
-            consumerRecords.forEach(record -> {
-                try {
-                    process(record.key(), record.value(), registry);
-                } catch (Exception e) {
-                    logger.error("Error while processing message {}", 
record.value(), e);
-                }
-            });
-            consumer.commitAsync();
+        try {
+          while (true) {
+              final ConsumerRecords<String, String> consumerRecords = 
consumer.poll(Duration.ofSeconds(1));
+              RegistryService.Iface registry = getRegistry();
+              consumerRecords.forEach(record -> {
+                  try {
+                      process(record.key(), record.value(), registry);
+                  } catch (Exception e) {
+                      logger.error("Error while processing message {}", 
record.value(), e);
+                  }
+              });
+              consumer.commitAsync();
+              if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException("RealtimeMonitor is 
interrupted!");
+              }
+          }
+        } catch (InterruptedException ex) {
+          logger.error("RealtimeMonitor is interrupted! reason: " + ex, ex);
+        } finally {
+          consumer.close();
         }
     }
 
@@ -94,10 +109,8 @@ public class RealtimeMonitor extends AbstractMonitor {
 
     @Override
     public void run() {
-        try {
-            runConsumer();
-        } catch (ApplicationSettingsException e) {
-            logger.error("Error while running consumer", e);
-        }
+        var thread = new Thread(this::runConsumer, 
this.getClass().getSimpleName());
+        thread.start();
+        Runtime.getRuntime().addShutdownHook(new Thread(thread::interrupt));
     }
 }
diff --git a/airavata-api/src/main/resources/airavata-server.properties 
b/airavata-api/src/main/resources/airavata-server.properties
index 66951740c2..febed3a742 100644
--- a/airavata-api/src/main/resources/airavata-server.properties
+++ b/airavata-api/src/main/resources/airavata-server.properties
@@ -70,6 +70,7 @@ 
workflowcatalog.jdbc.url=jdbc:mariadb://airavata.host:13306/workflow_catalog
 workflowcatalog.jdbc.user=airavata
 workflowcatalog.validationQuery=SELECT 1 from CONFIGURATION
 
+email.based.monitor.host=imap.gmail.com
 [email protected]
 email.based.monitor.folder.name=INBOX
 email.based.monitor.password=123456
diff --git a/airavata-api/src/main/resources/distribution/bin/setenv.sh 
b/airavata-api/src/main/resources/distribution/bin/setenv.sh
new file mode 100755
index 0000000000..9024de1d7b
--- /dev/null
+++ b/airavata-api/src/main/resources/distribution/bin/setenv.sh
@@ -0,0 +1,146 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Resolve symlinks to get the real script location
+PRG="$0"
+while [ -L "$PRG" ]; do
+  PRG=$(readlink "$PRG")
+done
+PRGDIR=$(dirname "$PRG")
+
+# Set AIRAVATA_HOME if not already set
+[ -z "$AIRAVATA_HOME" ] && AIRAVATA_HOME=$(cd "$PRGDIR/.." && pwd)
+
+# Build CLASSPATH from all JAR files
+CLASSPATH=$(printf "%s:" "$AIRAVATA_HOME"/lib/*.jar)
+CLASSPATH=${CLASSPATH%:} # Remove trailing colon
+
+export AIRAVATA_HOME CLASSPATH
+
+# Common function to run Airavata services
+# Usage: run_service <service_name> <main_class> <java_opts>
+run_service() {
+  local SERVICE_NAME="$1" MAIN_CLASS="$2" JAVA_OPTS="$3"
+  # Export SERVICE_NAME as environment variable for log4j2 configuration
+  export SERVICE_NAME
+  local CWD="$PWD" PID_PATH_NAME="${AIRAVATA_HOME}/bin/pid-${SERVICE_NAME}"
+  local DEFAULT_LOG_FILE="${AIRAVATA_HOME}/logs/${SERVICE_NAME}.log"
+  local LOG_FILE="$DEFAULT_LOG_FILE" DAEMON_MODE=false EXTRA_ARGS=""
+
+  # Help text
+  local HELP_TEXT="Usage: ${SERVICE_NAME}.sh
+
+command options:
+  -d                  Run in daemon mode
+  -xdebug             Start ${SERVICE_NAME} under JPDA debugger
+  -log <LOG_FILE>     Where to redirect stdout/stderr (defaults to 
$DEFAULT_LOG_FILE)
+  -h                  Display this help and exit
+
+Daemon mode commands (use with -d):
+  start               Start server in daemon mode
+  stop                Stop server running in daemon mode
+  restart             Restart server in daemon mode"
+
+  cd "${AIRAVATA_HOME}/bin"
+
+  # Helper function to stop daemon process
+  stop_daemon() {
+    if [[ -f "$PID_PATH_NAME" ]]; then
+      local PID=$(cat "$PID_PATH_NAME")
+      echo "$SERVICE_NAME stopping..."
+      pkill -P "$PID"
+      kill "$PID"
+
+      local retry=0
+      while kill -0 "$PID" 2>/dev/null && ((retry++ < 20)); do
+        echo "[PID: $PID] Waiting for process to stop..."
+        sleep 1
+      done
+
+      if kill -0 "$PID" 2>/dev/null; then
+        echo "[PID: $PID] Forcefully killing non-responsive process..."
+        pkill -9 -P "$PID"
+        kill -9 "$PID"
+      fi
+
+      echo "$SERVICE_NAME is now stopped."
+      rm "$PID_PATH_NAME"
+      return 0
+    else
+      echo "$SERVICE_NAME is not running."
+      return 1
+    fi
+  }
+
+  # Helper function to start daemon process
+  start_daemon() {
+    echo "Starting $SERVICE_NAME ..."
+    if [[ ! -f "$PID_PATH_NAME" ]]; then
+      nohup java $JAVA_OPTS -classpath "$CLASSPATH" "$MAIN_CLASS" "$@" 
>"$LOG_FILE" 2>&1 &
+      echo $! >"$PID_PATH_NAME"
+      echo "$SERVICE_NAME now running: PID $(cat "$PID_PATH_NAME")"
+    else
+      echo "$SERVICE_NAME already running: PID $(cat "$PID_PATH_NAME")"
+    fi
+  }
+
+  # Parse command arguments
+  while (($# > 0)); do
+    case "$1" in
+    -d) DAEMON_MODE=true ;;
+    -xdebug) JAVA_OPTS+=" -Xdebug -Xnoagent 
-Xrunjdwp:transport=dt_socket,server=y,address=*:8000" ;;
+    -log)
+      shift
+      LOG_FILE="$1"
+      [[ "$LOG_FILE" != /* ]] && LOG_FILE="${CWD}/${LOG_FILE}"
+      ;;
+    start | stop | restart)
+      if [[ "$DAEMON_MODE" == true ]]; then
+        case "$1" in
+        start) start_daemon "$@" ;;
+        stop) stop_daemon ;;
+        restart)
+          stop_daemon
+          start_daemon "$@"
+          ;;
+        esac
+        exit 0
+      else
+        EXTRA_ARGS+=" $1"
+      fi
+      ;;
+    -h)
+      echo "$HELP_TEXT"
+      exit 0
+      ;;
+    *) EXTRA_ARGS+=" $1" ;;
+    esac
+    shift
+  done
+
+  # Validate daemon mode usage
+  if [[ "$DAEMON_MODE" == true ]]; then
+    echo "Error: Daemon mode (-d) requires one of: start, stop, restart"
+    echo "Use -h for help"
+    exit 1
+  fi
+
+  # Run in foreground mode
+  java $JAVA_OPTS -classpath "$CLASSPATH" "$MAIN_CLASS" $EXTRA_ARGS
+}
diff --git a/airavata-api/src/main/resources/email-config.yaml 
b/airavata-api/src/main/resources/email-config.yml
similarity index 100%
rename from airavata-api/src/main/resources/email-config.yaml
rename to airavata-api/src/main/resources/email-config.yml
diff --git a/airavata-api/src/main/resources/log4j2.xml 
b/airavata-api/src/main/resources/log4j2.xml
index 8fb27b1b68..5b32d38646 100644
--- a/airavata-api/src/main/resources/log4j2.xml
+++ b/airavata-api/src/main/resources/log4j2.xml
@@ -32,6 +32,7 @@
     <logger name="org.apache.zookeeper" level="ERROR" />
     <logger name="org.apache.airavata" level="INFO" />
     <logger name="org.hibernate" level="ERROR" />
+    <logger name="org.apache.commons.beanutils.converters" level="ERROR" />
     <Root level="INFO">
       <AppenderRef ref="Console" />
     </Root>
diff --git 
a/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2 
b/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2
index 42124afc90..a5972e0e40 100644
--- a/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2
+++ b/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2
@@ -179,6 +179,7 @@ credential.store.jdbc.validationQuery=SELECT 1 from 
CONFIGURATION
 
monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.gfac.monitor.impl.LocalJobMonitor
 
 #These properties will used to enable email base monitoring
+email.based.monitor.host={{ monitor_email_host }}
 email.based.monitor.address={{ monitor_email_address }}
 email.based.monitor.password={{ monitor_email_password }}
 email.based.monitor.folder.name=INBOX
diff --git 
a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
 
b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
index 37ed894f8b..97f698f029 100644
--- 
a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
+++ 
b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
@@ -34,7 +34,9 @@ 
security.manager.class=org.apache.airavata.service.security.KeyCloakSecurityMana
 ###########################################################################
 # Monitoring module Configuration
 ###########################################################################
+email.based.monitor.host={{ monitor_email_host }}
 email.based.monitor.address={{ monitor_email_address }}
+email.based.monitor.password={{ monitor_email_password }}
 job.notification.emailids=
 job.notification.enabled=true
 
diff --git a/dev-tools/deployment-scripts/airavata-server.properties 
b/dev-tools/deployment-scripts/airavata-server.properties
index 400e3c01cc..30b92b9e72 100644
--- a/dev-tools/deployment-scripts/airavata-server.properties
+++ b/dev-tools/deployment-scripts/airavata-server.properties
@@ -126,6 +126,7 @@ job.monitor.realtime.publisher.id=RealtimeProducer
 # email.based.monitor.folder.name=INBOX
 # email.expiration.minutes=60
 # email.based.monitoring.period=10000
+# email.based.monitor.host=CHANGEME
 # [email protected]
 # email.based.monitor.password=app_password_here
 


Reply via email to