This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch merge-svcs in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 7749cb85f06368f1f59f45ae2c77a82e55829fa8 Author: yasithdev <[email protected]> AuthorDate: Tue Aug 19 18:22:04 2025 -0400 add correct service name to threads. fix bugs, add missing resources and properties. --- .devcontainer/docker-compose-alt.yml | 2 +- .../src/main/assembly/api-server-bin-assembly.xml | 1 + .../src/main/java/org/apache/airavata/Main.java | 17 +++ .../org/apache/airavata/api/AiravataAPIServer.java | 4 +- .../airavata/common/utils/ServerSettings.java | 5 + .../db/event/manager/DBEventManagerRunner.java | 2 +- .../helix/core/participant/HelixParticipant.java | 23 ++-- .../helix/impl/controller/HelixController.java | 2 +- .../helix/impl/workflow/PostWorkflowManager.java | 139 ++++++++++---------- .../helix/impl/workflow/PreWorkflowManager.java | 17 ++- .../airavata/monitor/email/EmailBasedMonitor.java | 119 +++++++++-------- .../monitor/platform/MonitoringServer.java | 6 +- .../airavata/monitor/realtime/RealtimeMonitor.java | 49 ++++--- .../src/main/resources/airavata-server.properties | 1 + .../src/main/resources/distribution/bin/setenv.sh | 146 +++++++++++++++++++++ .../{email-config.yaml => email-config.yml} | 0 airavata-api/src/main/resources/log4j2.xml | 1 + .../templates/airavata-server.properties.j2 | 1 + .../participant/airavata-server.properties.j2 | 2 + .../deployment-scripts/airavata-server.properties | 1 + 20 files changed, 376 insertions(+), 162 deletions(-) diff --git a/.devcontainer/docker-compose-alt.yml b/.devcontainer/docker-compose-alt.yml index ddf80cb50e..8928180c3d 100644 --- a/.devcontainer/docker-compose-alt.yml +++ b/.devcontainer/docker-compose-alt.yml @@ -159,7 +159,6 @@ services: - zookeeper.server.connection=zookeeper:2181 - api.server.host=apiserver - rabbitmq.broker.url=amqp://guest:guest@rabbitmq:5672/develop - - email.based.monitor.address=CHANGEME - realtime.monitor.enabled=false command: ["/tmp/wait-for-it.sh", "zookeeper:2181", "--", "/tmp/wait-for-it.sh", "apiserver:8970", "--" , "/tmp/wait-for-it.sh", "rabbitmq:5672", "--", "/opt/apache-airavata-participant/bin/participant.sh"] @@ -177,6 +176,7 @@ services: - zookeeper.server.connection=zookeeper:2181 - api.server.host=apiserver - api.server.port=8970 + - email.based.monitor.host=CHANGEME - email.based.monitor.address=CHANGEME - email.based.monitor.password=CHANGEME - kafka.broker.url=kafka:9092 diff --git a/airavata-api/src/main/assembly/api-server-bin-assembly.xml b/airavata-api/src/main/assembly/api-server-bin-assembly.xml index a4ba2f2ae6..9bfea07815 100644 --- a/airavata-api/src/main/assembly/api-server-bin-assembly.xml +++ b/airavata-api/src/main/assembly/api-server-bin-assembly.xml @@ -62,6 +62,7 @@ <include>templates/*.template</include> <include>*.properties</include> <include>*.xml</include> + <include>*.yml</include> </includes> </fileSet> diff --git a/airavata-api/src/main/java/org/apache/airavata/Main.java b/airavata-api/src/main/java/org/apache/airavata/Main.java index b843b29069..493270b0ee 100644 --- a/airavata-api/src/main/java/org/apache/airavata/Main.java +++ b/airavata-api/src/main/java/org/apache/airavata/Main.java @@ -20,6 +20,7 @@ package org.apache.airavata; import org.apache.airavata.api.AiravataAPIServer; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.db.event.manager.DBEventManagerRunner; import org.apache.airavata.helix.impl.controller.HelixController; import org.apache.airavata.helix.impl.participant.GlobalParticipant; @@ -27,6 +28,7 @@ import org.apache.airavata.helix.impl.workflow.PostWorkflowManager; import org.apache.airavata.helix.impl.workflow.PreWorkflowManager; import org.apache.airavata.monitor.cluster.ClusterStatusMonitorJobScheduler; import org.apache.airavata.monitor.email.EmailBasedMonitor; +import org.apache.airavata.monitor.platform.MonitoringServer; import org.apache.airavata.monitor.realtime.RealtimeMonitor; public class Main { @@ -69,5 +71,20 @@ public class Main { var jobScheduler = new ClusterStatusMonitorJobScheduler(); assert jobScheduler != null; // jobScheduler.scheduleClusterStatusMonitoring(); + + if (ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) { + var monitoringServer = new MonitoringServer( + ServerSettings.getSetting("post.workflow.manager.monitoring.host"), + ServerSettings.getIntSetting("post.workflow.manager.monitoring.port")); + monitoringServer.start(); + Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop)); + } + + try { + Thread.currentThread().join(); + } catch (InterruptedException ex) { + System.out.println("Main thread is interrupted! reason: " + ex); + ServerSettings.setStopAllThreads(true); + } } } diff --git a/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java b/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java index f695be9a80..c6fcc84a02 100644 --- a/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java +++ b/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java @@ -161,7 +161,7 @@ public class AiravataAPIServer implements IServer { server.serve(); setStatus(ServerStatus.STOPPED); logger.info("Airavata Thrift API Stopped."); - }).start(); + }, this.getClass().getSimpleName()).start(); // Monitor server startup new Thread(() -> { @@ -180,7 +180,7 @@ public class AiravataAPIServer implements IServer { ServiceName.SHARING_REGISTRY.toString(), ServiceName.ORCHESTRATOR.toString(), ServiceName.USER_PROFILE.toString(), ServiceName.TENANT_PROFILE.toString(), ServiceName.IAM_ADMIN_SERVICES.toString(), ServiceName.GROUP_MANAGER.toString()); } - }).start(); + }, this.getClass().getSimpleName() + ".Monitor").start(); } catch (TTransportException e) { logger.error("Failed to start Airavata Thrift API", e); diff --git a/airavata-api/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/airavata-api/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index 8ddc3f7ba6..9cc774b10e 100644 --- a/airavata-api/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/airavata-api/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -57,6 +57,7 @@ public class ServerSettings extends ApplicationSettings { // email-based monitoring configurations private static final String EMAIL_BASED_MONITORING_PERIOD = "email.based.monitoring.period"; + private static final String EMAIL_BASED_MONITOR_HOST = "email.based.monitor.host"; private static final String EMAIL_BASED_MONITOR_ADDRESS = "email.based.monitor.address"; private static final String EMAIL_BASED_MONITOR_PASSWORD = "email.based.monitor.password"; private static final String EMAIL_BASED_MONITOR_FOLDER_NAME = "email.based.monitor.folder.name"; @@ -177,6 +178,10 @@ public class ServerSettings extends ApplicationSettings { return Integer.parseInt(getSetting(EMAIL_BASED_MONITORING_PERIOD, "100000")); } + public static String getEmailBasedMonitorHost() throws ApplicationSettingsException { + return getSetting(EMAIL_BASED_MONITOR_HOST); + } + public static String getEmailBasedMonitorAddress() throws ApplicationSettingsException { return getSetting(EMAIL_BASED_MONITOR_ADDRESS); } diff --git a/airavata-api/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java b/airavata-api/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java index a4dd2c83b8..f0c6081c75 100644 --- a/airavata-api/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java +++ b/airavata-api/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java @@ -80,7 +80,7 @@ public class DBEventManagerRunner implements IServer { // start the worker thread log.info("Starting the DB Event Manager runner."); - new Thread(runner).start(); + new Thread(runner, this.getClass().getSimpleName()).start(); setStatus(ServerStatus.STARTED); } catch (Exception ex) { log.error("Something went wrong with the DB Event Manager runner. Error: " + ex, ex); diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java index 93de5213d3..716853d3ca 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java @@ -135,7 +135,14 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { return taskRegistry; } + @Override public void run() { + var thread = new Thread(this::start, this.getClass().getSimpleName()); + thread.start(); + Runtime.getRuntime().addShutdownHook(new Thread(thread::interrupt)); + } + + private void start() { ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin.Builder().setZkAddress(zkAddress).build(); try { List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName); @@ -165,7 +172,7 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { logger.warn("Participant: " + participantName + " was not disabled normally", e); } disconnect(); - })); + }, this.getClass().getSimpleName() + ".ShutdownHook")); // connect the participant manager connect(); @@ -195,16 +202,12 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { zkHelixManager.connect(); logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName); - if (ServerSettings.getBooleanSetting("api.monitoring.enabled")) { - System.out.println("Starting participant monitoring server ......."); - var monitoringServer = new MonitoringServer( - ServerSettings.getSetting("api.monitoring.host"), - ServerSettings.getIntSetting("api.monitoring.port")); - monitoringServer.start(); - Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop)); + try { + Thread.currentThread().join(); + } catch (InterruptedException ex) { + logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex); } - Thread.currentThread().join(); - + } catch (InterruptedException ex) { logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex); } catch (Exception ex) { diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java index 292e21e3fc..1b9aec7117 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java @@ -74,7 +74,7 @@ public class HelixController implements Runnable { } public void start() throws Exception { - Thread controllerThread = new Thread(this); + Thread controllerThread = new Thread(this, this.getClass().getSimpleName()); controllerThread.start(); Runtime.getRuntime().addShutdownHook(new Thread(controllerThread::interrupt)); } diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java index 18e1e7e32b..2b40c169d5 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java @@ -19,6 +19,7 @@ */ package org.apache.airavata.helix.impl.workflow; +import java.time.Duration; import java.util.*; import java.util.concurrent.*; import org.apache.airavata.common.exception.ApplicationSettingsException; @@ -48,7 +49,6 @@ import org.apache.airavata.monitor.JobStateValidator; import org.apache.airavata.monitor.JobStatusResult; import org.apache.airavata.monitor.kafka.JobStatusResultDeserializer; import org.apache.airavata.monitor.platform.CountMonitor; -import org.apache.airavata.monitor.platform.MonitoringServer; import org.apache.airavata.registry.api.RegistryService; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; @@ -70,19 +70,9 @@ public class PostWorkflowManager extends WorkflowManager { @Override public void run() { - try { - if (ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) { - MonitoringServer monitoringServer = new MonitoringServer( - ServerSettings.getSetting("post.workflow.manager.monitoring.host"), - ServerSettings.getIntSetting("post.workflow.manager.monitoring.port")); - monitoringServer.start(); - - Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop)); - } - startServer(); - } catch (Exception e) { - logger.error("Error starting PreWorkflowManager", e); - } + var thread = new Thread(this::startServer, this.getClass().getSimpleName()); + thread.start(); + Runtime.getRuntime().addShutdownHook(new Thread(thread::interrupt)); } private void init() throws Exception { @@ -328,58 +318,75 @@ public class PostWorkflowManager extends WorkflowManager { registerWorkflowForProcess(processId, workflowName, "POST"); } - public void startServer() throws Exception { - - init(); - final Consumer<String, JobStatusResult> consumer = createConsumer(); - new Thread(() -> { - while (true) { - final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(Long.MAX_VALUE); - var executorCompletionService = new ExecutorCompletionService<>(processingPool); - var processingFutures = new ArrayList<>(); - - for (var topicPartition : consumerRecords.partitions()) { - var partitionRecords = consumerRecords.records(topicPartition); - logger.info("Received job records {}", partitionRecords.size()); - - for (var record : partitionRecords) { - var topic = topicPartition.topic(); - var partition = topicPartition.partition(); - var key = record.key(); - var value = record.value(); - logger.info("received post on {}/{}: {}->{}", topic, partition, key, value); - logger.info( - "Submitting {} to process in thread pool", - record.value().getJobId()); - - // This avoids kafka read thread to wait until processing is completed before committing - // There is a risk of missing 20 messages in case of a restart, but this improves the - // robustness of the kafka read thread by avoiding wait timeouts - processingFutures.add(executorCompletionService.submit(() -> { - boolean success = process(record.value()); - logger.info( - "Status of processing {} : {}", - record.value().getJobId(), - success); - return success; - })); - - consumer.commitSync(Collections.singletonMap( - topicPartition, new OffsetAndMetadata(record.offset() + 1))); - } - } - - for (var f : processingFutures) { - try { - executorCompletionService.take().get(); - } catch (Exception e) { - logger.error("Failed processing job", e); - } - } - logger.info("All messages processed. Moving to next round"); - } - }) - .start(); + public void startServer() { + try { + init(); + } catch (Exception e) { + logger.error("Error starting PostWorkflowManager", e); + } + final Consumer<String, JobStatusResult> consumer; + try { + consumer = createConsumer(); + } catch (ApplicationSettingsException e) { + logger.error("Error creating consumer", e); + return; + } + try { + while (true) { + final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); + var executorCompletionService = new ExecutorCompletionService<>(processingPool); + var processingFutures = new ArrayList<>(); + + for (var topicPartition : consumerRecords.partitions()) { + var partitionRecords = consumerRecords.records(topicPartition); + logger.info("Received job records {}", partitionRecords.size()); + + for (var record : partitionRecords) { + var topic = topicPartition.topic(); + var partition = topicPartition.partition(); + var key = record.key(); + var value = record.value(); + logger.info("received post on {}/{}: {}->{}", topic, partition, key, value); + logger.info( + "Submitting {} to process in thread pool", + record.value().getJobId()); + + // This avoids kafka read thread to wait until processing is completed before committing + // There is a risk of missing 20 messages in case of a restart, but this improves the + // robustness of the kafka read thread by avoiding wait timeouts + processingFutures.add(executorCompletionService.submit(() -> { + boolean success = process(record.value()); + logger.info( + "Status of processing {} : {}", + record.value().getJobId(), + success); + return success; + })); + + consumer.commitSync(Collections.singletonMap( + topicPartition, new OffsetAndMetadata(record.offset() + 1))); + } + } + + for (var f : processingFutures) { + try { + executorCompletionService.take().get(); + } catch (Exception e) { + logger.error("Failed processing job", e); + } + } + logger.info("All messages processed. Moving to next round"); + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("PostWorkflowManager is interrupted!"); + } + } + } catch (InterruptedException ex) { + logger.error("PostWorkflowManager is interrupted! reason: " + ex, ex); + } finally { + consumer.close(); + processingPool.shutdown(); + } } private void saveAndPublishJobStatus( diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java index b009199229..7636b8c0eb 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java @@ -75,9 +75,16 @@ public class PreWorkflowManager extends WorkflowManager { Boolean.parseBoolean(ServerSettings.getSetting("pre.workflow.manager.loadbalance.clusters"))); } - public void startServer() throws Exception { + public void startServer() { + try { super.initComponents(); initLaunchSubscriber(); + Thread.currentThread().join(); + } catch (InterruptedException ex) { + logger.error("PreWorkflowManager is interrupted! reason: " + ex, ex); + } catch (Exception e) { + logger.error("Error starting PreWorkflowManager", e); + } } public void stopServer() {} @@ -284,11 +291,9 @@ public class PreWorkflowManager extends WorkflowManager { @Override public void run() { - try { - startServer(); - } catch (Exception e) { - logger.error("Error starting PreWorkflowManager", e); - } + var thread = new Thread(this::startServer, this.getClass().getSimpleName()); + thread.start(); + Runtime.getRuntime().addShutdownHook(new Thread(thread::interrupt)); } private class ProcessLaunchMessageHandler implements MessageHandler { diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java index 7d9eec019b..6e8eef4eb4 100644 --- a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java +++ b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java @@ -69,7 +69,7 @@ public class EmailBasedMonitor extends AbstractMonitor { private void init() throws Exception { loadContext(); - host = ServerSettings.getApiServerHost(); + host = ServerSettings.getEmailBasedMonitorHost(); emailAddress = ServerSettings.getEmailBasedMonitorAddress(); password = ServerSettings.getEmailBasedMonitorPassword(); storeProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol(); @@ -139,7 +139,7 @@ public class EmailBasedMonitor extends AbstractMonitor { } public void monitor(String jobId) { - log.info("[EJM]: Added monitor Id : {} to email based monitor map", jobId); + log.info("Added monitor Id : {} to email based monitor map", jobId); } private JobStatusResult parse(Message message, String publisherId) throws MessagingException, AiravataException { @@ -148,7 +148,7 @@ public class EmailBasedMonitor extends AbstractMonitor { ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr); EmailParser emailParser = emailParserMap.get(jobMonitorType); if (emailParser == null) { - throw new AiravataException("[EJM]: Un-handle resource job manager type: " + jobMonitorType.toString() + throw new AiravataException("Un-handle resource job manager type: " + jobMonitorType.toString() + " for email monitoring --> " + addressStr); } RegistryService.Iface registry = getRegistry(); @@ -167,79 +167,90 @@ public class EmailBasedMonitor extends AbstractMonitor { return addressEntry.getValue(); } } - throw new AiravataException("[EJM]: Couldn't identify Resource job manager type from address " + addressStr); + throw new AiravataException("Couldn't identify Resource job manager type from address " + addressStr); } - @Override - public void run() { - + private void runEmailMonitor() { + Session session = null; + SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false); while (!ServerSettings.isStopAllThreads()) { try { - Session session = Session.getDefaultInstance(properties); - store = session.getStore(storeProtocol); - store.connect(host, emailAddress, password); - emailFolder = store.getFolder(folderName); - // first we search for all unread messages. - SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false); - while (!ServerSettings.isStopAllThreads()) { - Thread.sleep(ServerSettings.getEmailMonitorPeriod()); // sleep for long enough - if (!store.isConnected()) { - store.connect(); - emailFolder = store.getFolder(folderName); - } - log.info("[EJM]: Retrieving unseen emails"); - if (emailFolder == null) { - return; - } - emailFolder.open(Folder.READ_WRITE); - if (emailFolder.isOpen()) { - // flush if any message left in flushUnseenMessage - if (flushUnseenMessages != null && flushUnseenMessages.length > 0) { - try { + if (session == null) { + session = Session.getDefaultInstance(properties); + } + if (store == null || !store.isConnected()) { + store = session.getStore(storeProtocol); + store.connect(host, emailAddress, password); + } + if (emailFolder == null || !emailFolder.isOpen()) { + emailFolder = store.getFolder(folderName); + } + + log.info("Retrieving unseen emails"); + if (emailFolder == null) { + return; + } + emailFolder.open(Folder.READ_WRITE); + if (emailFolder.isOpen()) { + // flush if any message left in flushUnseenMessage + if (flushUnseenMessages != null && flushUnseenMessages.length > 0) { + try { + emailFolder.setFlags(flushUnseenMessages, new Flags(Flags.Flag.SEEN), false); + flushUnseenMessages = null; + } catch (MessagingException e) { + if (!store.isConnected()) { + store.connect(); emailFolder.setFlags(flushUnseenMessages, new Flags(Flags.Flag.SEEN), false); flushUnseenMessages = null; - } catch (MessagingException e) { - if (!store.isConnected()) { - store.connect(); - emailFolder.setFlags(flushUnseenMessages, new Flags(Flags.Flag.SEEN), false); - flushUnseenMessages = null; - } } } - Message[] searchMessages = emailFolder.search(unseenBefore); - if (searchMessages == null || searchMessages.length == 0) { - log.info("[EJM]: No new email messages"); - } else { - log.info("[EJM]: {} new email/s received", searchMessages.length); - processMessages(searchMessages); - } - emailFolder.close(false); } + Message[] searchMessages = emailFolder.search(unseenBefore); + if (searchMessages == null || searchMessages.length == 0) { + log.info("No new email messages"); + } else { + log.info("{} new email/s received", searchMessages.length); + processMessages(searchMessages); + } + emailFolder.close(false); + } + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("EmailBasedMonitor is interrupted!"); } + } catch (InterruptedException ex) { + log.error("EmailBasedMonitor is interrupted! reason: " + ex, ex); } catch (MessagingException e) { - log.error("[EJM]: Couldn't connect to the store ", e); - } catch (InterruptedException e) { - log.error("[EJM]: Interrupt exception while sleep ", e); - } catch (AiravataException e) { - log.error("[EJM]: UnHandled arguments ", e); + log.error("Couldn't connect to the store ", e); } catch (Throwable e) { - log.error("[EJM]: Caught a throwable ", e); + log.error("Caught a throwable ", e); } finally { try { - if (emailFolder != null) { + if (emailFolder != null && emailFolder.isOpen()) { emailFolder.close(false); } - if (store != null) { + if (store != null && store.isConnected()) { store.close(); } } catch (MessagingException e) { - log.error("[EJM]: Store close operation failed, couldn't close store", e); - } catch (Throwable e) { - log.error("[EJM]: Caught a throwable while closing email store ", e); + log.error("Store close operation failed, couldn't close store", e); + } + try { + Thread.sleep(ServerSettings.getEmailMonitorPeriod()); + } catch (InterruptedException e) { + log.error("interrupted while sleeping ", e); + } catch (Exception e) { + log.error("exception thrown when attempting to sleep ", e); } } } - log.info("[EJM]: Email monitoring daemon stopped"); + log.info("Email monitoring daemon stopped"); + } + + @Override + public void run() { + var thread = new Thread(this::runEmailMonitor, this.getClass().getSimpleName()); + thread.start(); + Runtime.getRuntime().addShutdownHook(new Thread(thread::interrupt)); } private void processMessages(Message[] searchMessages) throws MessagingException { diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java b/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java index 755cdc2e91..a1bfca1e42 100644 --- a/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java +++ b/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java @@ -37,11 +37,11 @@ public class MonitoringServer { this.port = port; } - public void start() throws IOException { + public void start() { try { logger.info("Starting the monitoring server"); httpServer = new HTTPServer(host, port, true); - } catch (IOException e) { + } catch (Exception e) { logger.error("Failed to start the monitoring server on host {} na port {}", host, port, e); } } @@ -49,7 +49,7 @@ public class MonitoringServer { public void stop() { if (httpServer != null) { logger.info("Stopping the monitor server"); - httpServer.stop(); + httpServer.close(); } } } diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java index 9b93ca8b67..5ac34f6c3f 100644 --- a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java +++ b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java @@ -64,20 +64,35 @@ public class RealtimeMonitor extends AbstractMonitor { return consumer; } - private void runConsumer() throws ApplicationSettingsException { - final Consumer<String, String> consumer = createConsumer(); + private void runConsumer() { + final Consumer<String, String> consumer; + try { + consumer = createConsumer(); + } catch (ApplicationSettingsException e) { + logger.error("Error while creating consumer", e); + return; + } - while (true) { - final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); - RegistryService.Iface registry = getRegistry(); - consumerRecords.forEach(record -> { - try { - process(record.key(), record.value(), registry); - } catch (Exception e) { - logger.error("Error while processing message {}", record.value(), e); - } - }); - consumer.commitAsync(); + try { + while (true) { + final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); + RegistryService.Iface registry = getRegistry(); + consumerRecords.forEach(record -> { + try { + process(record.key(), record.value(), registry); + } catch (Exception e) { + logger.error("Error while processing message {}", record.value(), e); + } + }); + consumer.commitAsync(); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("RealtimeMonitor is interrupted!"); + } + } + } catch (InterruptedException ex) { + logger.error("RealtimeMonitor is interrupted! reason: " + ex, ex); + } finally { + consumer.close(); } } @@ -94,10 +109,8 @@ public class RealtimeMonitor extends AbstractMonitor { @Override public void run() { - try { - runConsumer(); - } catch (ApplicationSettingsException e) { - logger.error("Error while running consumer", e); - } + var thread = new Thread(this::runConsumer, this.getClass().getSimpleName()); + thread.start(); + Runtime.getRuntime().addShutdownHook(new Thread(thread::interrupt)); } } diff --git a/airavata-api/src/main/resources/airavata-server.properties b/airavata-api/src/main/resources/airavata-server.properties index 66951740c2..febed3a742 100644 --- a/airavata-api/src/main/resources/airavata-server.properties +++ b/airavata-api/src/main/resources/airavata-server.properties @@ -70,6 +70,7 @@ workflowcatalog.jdbc.url=jdbc:mariadb://airavata.host:13306/workflow_catalog workflowcatalog.jdbc.user=airavata workflowcatalog.validationQuery=SELECT 1 from CONFIGURATION +email.based.monitor.host=imap.gmail.com [email protected] email.based.monitor.folder.name=INBOX email.based.monitor.password=123456 diff --git a/airavata-api/src/main/resources/distribution/bin/setenv.sh b/airavata-api/src/main/resources/distribution/bin/setenv.sh new file mode 100755 index 0000000000..9024de1d7b --- /dev/null +++ b/airavata-api/src/main/resources/distribution/bin/setenv.sh @@ -0,0 +1,146 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Resolve symlinks to get the real script location +PRG="$0" +while [ -L "$PRG" ]; do + PRG=$(readlink "$PRG") +done +PRGDIR=$(dirname "$PRG") + +# Set AIRAVATA_HOME if not already set +[ -z "$AIRAVATA_HOME" ] && AIRAVATA_HOME=$(cd "$PRGDIR/.." && pwd) + +# Build CLASSPATH from all JAR files +CLASSPATH=$(printf "%s:" "$AIRAVATA_HOME"/lib/*.jar) +CLASSPATH=${CLASSPATH%:} # Remove trailing colon + +export AIRAVATA_HOME CLASSPATH + +# Common function to run Airavata services +# Usage: run_service <service_name> <main_class> <java_opts> +run_service() { + local SERVICE_NAME="$1" MAIN_CLASS="$2" JAVA_OPTS="$3" + # Export SERVICE_NAME as environment variable for log4j2 configuration + export SERVICE_NAME + local CWD="$PWD" PID_PATH_NAME="${AIRAVATA_HOME}/bin/pid-${SERVICE_NAME}" + local DEFAULT_LOG_FILE="${AIRAVATA_HOME}/logs/${SERVICE_NAME}.log" + local LOG_FILE="$DEFAULT_LOG_FILE" DAEMON_MODE=false EXTRA_ARGS="" + + # Help text + local HELP_TEXT="Usage: ${SERVICE_NAME}.sh + +command options: + -d Run in daemon mode + -xdebug Start ${SERVICE_NAME} under JPDA debugger + -log <LOG_FILE> Where to redirect stdout/stderr (defaults to $DEFAULT_LOG_FILE) + -h Display this help and exit + +Daemon mode commands (use with -d): + start Start server in daemon mode + stop Stop server running in daemon mode + restart Restart server in daemon mode" + + cd "${AIRAVATA_HOME}/bin" + + # Helper function to stop daemon process + stop_daemon() { + if [[ -f "$PID_PATH_NAME" ]]; then + local PID=$(cat "$PID_PATH_NAME") + echo "$SERVICE_NAME stopping..." + pkill -P "$PID" + kill "$PID" + + local retry=0 + while kill -0 "$PID" 2>/dev/null && ((retry++ < 20)); do + echo "[PID: $PID] Waiting for process to stop..." + sleep 1 + done + + if kill -0 "$PID" 2>/dev/null; then + echo "[PID: $PID] Forcefully killing non-responsive process..." + pkill -9 -P "$PID" + kill -9 "$PID" + fi + + echo "$SERVICE_NAME is now stopped." + rm "$PID_PATH_NAME" + return 0 + else + echo "$SERVICE_NAME is not running." + return 1 + fi + } + + # Helper function to start daemon process + start_daemon() { + echo "Starting $SERVICE_NAME ..." + if [[ ! -f "$PID_PATH_NAME" ]]; then + nohup java $JAVA_OPTS -classpath "$CLASSPATH" "$MAIN_CLASS" "$@" >"$LOG_FILE" 2>&1 & + echo $! >"$PID_PATH_NAME" + echo "$SERVICE_NAME now running: PID $(cat "$PID_PATH_NAME")" + else + echo "$SERVICE_NAME already running: PID $(cat "$PID_PATH_NAME")" + fi + } + + # Parse command arguments + while (($# > 0)); do + case "$1" in + -d) DAEMON_MODE=true ;; + -xdebug) JAVA_OPTS+=" -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,address=*:8000" ;; + -log) + shift + LOG_FILE="$1" + [[ "$LOG_FILE" != /* ]] && LOG_FILE="${CWD}/${LOG_FILE}" + ;; + start | stop | restart) + if [[ "$DAEMON_MODE" == true ]]; then + case "$1" in + start) start_daemon "$@" ;; + stop) stop_daemon ;; + restart) + stop_daemon + start_daemon "$@" + ;; + esac + exit 0 + else + EXTRA_ARGS+=" $1" + fi + ;; + -h) + echo "$HELP_TEXT" + exit 0 + ;; + *) EXTRA_ARGS+=" $1" ;; + esac + shift + done + + # Validate daemon mode usage + if [[ "$DAEMON_MODE" == true ]]; then + echo "Error: Daemon mode (-d) requires one of: start, stop, restart" + echo "Use -h for help" + exit 1 + fi + + # Run in foreground mode + java $JAVA_OPTS -classpath "$CLASSPATH" "$MAIN_CLASS" $EXTRA_ARGS +} diff --git a/airavata-api/src/main/resources/email-config.yaml b/airavata-api/src/main/resources/email-config.yml similarity index 100% rename from airavata-api/src/main/resources/email-config.yaml rename to airavata-api/src/main/resources/email-config.yml diff --git a/airavata-api/src/main/resources/log4j2.xml b/airavata-api/src/main/resources/log4j2.xml index 8fb27b1b68..5b32d38646 100644 --- a/airavata-api/src/main/resources/log4j2.xml +++ b/airavata-api/src/main/resources/log4j2.xml @@ -32,6 +32,7 @@ <logger name="org.apache.zookeeper" level="ERROR" /> <logger name="org.apache.airavata" level="INFO" /> <logger name="org.hibernate" level="ERROR" /> + <logger name="org.apache.commons.beanutils.converters" level="ERROR" /> <Root level="INFO"> <AppenderRef ref="Console" /> </Root> diff --git a/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2 b/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2 index 42124afc90..a5972e0e40 100644 --- a/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2 +++ b/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2 @@ -179,6 +179,7 @@ credential.store.jdbc.validationQuery=SELECT 1 from CONFIGURATION monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.gfac.monitor.impl.LocalJobMonitor #These properties will used to enable email base monitoring +email.based.monitor.host={{ monitor_email_host }} email.based.monitor.address={{ monitor_email_address }} email.based.monitor.password={{ monitor_email_password }} email.based.monitor.folder.name=INBOX diff --git a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 index 37ed894f8b..97f698f029 100644 --- a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 +++ b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 @@ -34,7 +34,9 @@ security.manager.class=org.apache.airavata.service.security.KeyCloakSecurityMana ########################################################################### # Monitoring module Configuration ########################################################################### +email.based.monitor.host={{ monitor_email_host }} email.based.monitor.address={{ monitor_email_address }} +email.based.monitor.password={{ monitor_email_password }} job.notification.emailids= job.notification.enabled=true diff --git a/dev-tools/deployment-scripts/airavata-server.properties b/dev-tools/deployment-scripts/airavata-server.properties index 400e3c01cc..30b92b9e72 100644 --- a/dev-tools/deployment-scripts/airavata-server.properties +++ b/dev-tools/deployment-scripts/airavata-server.properties @@ -126,6 +126,7 @@ job.monitor.realtime.publisher.id=RealtimeProducer # email.based.monitor.folder.name=INBOX # email.expiration.minutes=60 # email.based.monitoring.period=10000 +# email.based.monitor.host=CHANGEME # [email protected] # email.based.monitor.password=app_password_here
