This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch merge-svcs in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 807ee5c7b327de36369793eea5cfa2066994d2bd Author: yasithdev <[email protected]> AuthorDate: Tue Aug 19 00:29:16 2025 -0400 initial merge of all thrift apis --- .vscode/launch.json | 21 +- .../src/main/java/org/apache/airavata/Main.java | 62 ++++ .../helix/core/participant/HelixParticipant.java | 37 +-- .../helix/impl/controller/HelixController.java | 79 ++--- .../helix/impl/participant/GlobalParticipant.java | 30 +- .../helix/impl/workflow/ParserWorkflowManager.java | 29 +- .../helix/impl/workflow/PostWorkflowManager.java | 28 +- .../helix/impl/workflow/PreWorkflowManager.java | 12 +- .../helix/impl/workflow/WorkflowManager.java | 2 +- .../apache/airavata/monitor/AbstractMonitor.java | 2 +- .../airavata/monitor/email/EmailBasedMonitor.java | 12 +- .../airavata/monitor/realtime/RealtimeMonitor.java | 11 +- .../org/apache/airavata/server/ServerMain.java | 333 ++++----------------- .../apache/airavata/server/ThriftAPIServer.java | 223 ++++++++++++++ modules/ide-integration/.gitignore | 1 - modules/ide-integration/README.md | 294 ------------------ modules/ide-integration/pom.xml | 56 ---- .../airavata/ide/integration/APIServerStarter.java | 49 --- .../airavata/ide/integration/JobEngineStarter.java | 73 ----- .../ide/integration/JobMonitorStarter.java | 29 -- pom.xml | 1 - 21 files changed, 455 insertions(+), 929 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 9a39c30cdc..62150d2f6c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -2,25 +2,10 @@ "configurations": [ { "type": "java", - "name": "Start API Server", + "name": "Main", "request": "launch", - "cwd": "${workspaceFolder}/modules/ide-integration", - "mainClass": "org.apache.airavata.ide.integration.APIServerStarter", - "vmArgs": "-javaagent:/home/vscode/.m2/repository/org/apache/openjpa/openjpa/4.0.1/openjpa-4.0.1.jar", - }, - { - "type": "java", - "name": "Start Job Engine", - "request": "launch", - "cwd": "${workspaceFolder}/modules/ide-integration", - "mainClass": "org.apache.airavata.ide.integration.JobEngineStarter" - }, - { - "type": "java", - "name": "Start Job Monitor", - "request": "launch", - "cwd": "${workspaceFolder}/modules/ide-integration", - "mainClass": "org.apache.airavata.ide.integration.JobMonitorStarter" + "mainClass": "org.apache.airavata.Main", + "projectName": "airavata-api" } ], } \ No newline at end of file diff --git a/airavata-api/src/main/java/org/apache/airavata/Main.java b/airavata-api/src/main/java/org/apache/airavata/Main.java new file mode 100644 index 0000000000..d60f122ed1 --- /dev/null +++ b/airavata-api/src/main/java/org/apache/airavata/Main.java @@ -0,0 +1,62 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.airavata; + +import org.apache.airavata.db.event.manager.DBEventManagerRunner; +import org.apache.airavata.helix.impl.controller.HelixController; +import org.apache.airavata.helix.impl.participant.GlobalParticipant; +import org.apache.airavata.helix.impl.workflow.PostWorkflowManager; +import org.apache.airavata.helix.impl.workflow.PreWorkflowManager; +import org.apache.airavata.monitor.email.EmailBasedMonitor; +import org.apache.airavata.server.ThriftAPIServer; + +public class Main { + + public static void main(String[] args) throws Exception { + + System.out.println("Starting Thrift API Server ......."); + var thriftApiServer = new ThriftAPIServer(); + thriftApiServer.start(); + + System.out.println("Starting DB Event Manager Runner ......."); + var dbEventManagerRunner = new DBEventManagerRunner(); + dbEventManagerRunner.start(); + + System.out.println("Starting Helix Controller ......."); + var helixController = new HelixController(); + helixController.start(); + + System.out.println("Starting Helix Participant ......."); + var globalParticipant = new GlobalParticipant(); + globalParticipant.run(); + + System.out.println("Starting Pre Workflow Manager ......."); + var preWorkflowManager = new PreWorkflowManager(); + preWorkflowManager.run(); + + System.out.println("Starting Post Workflow Manager ......."); + var postWorkflowManager = new PostWorkflowManager(); + postWorkflowManager.run(); + + System.out.println("Starting Email Based Monitor ......."); + var emailBasedMonitor = new EmailBasedMonitor(); + emailBasedMonitor.run(); + } +} diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java index eff9eeca88..0ea915cac8 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java @@ -19,18 +19,19 @@ */ package org.apache.airavata.helix.core.participant; +import java.lang.reflect.InvocationTargetException; import java.util.*; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.helix.core.AbstractTask; import org.apache.airavata.helix.core.support.TaskHelperImpl; import org.apache.airavata.helix.task.api.annotation.TaskDef; +import org.apache.airavata.patform.monitoring.MonitoringServer; import org.apache.helix.InstanceType; +import org.apache.helix.constants.InstanceConstants.InstanceOperation; import org.apache.helix.examples.OnlineOfflineStateModelFactory; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixManager; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.InstanceConfig; import org.apache.helix.participant.StateMachineEngine; @@ -116,11 +117,11 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { TaskFactory taskFac = context -> { try { return AbstractTask.class - .cast(taskClass.newInstance()) + .cast(taskClass.getDeclaredConstructor().newInstance()) .setParticipant(HelixParticipant.this) .setCallbackContext(context) .setTaskHelper(new TaskHelperImpl()); - } catch (InstantiationException | IllegalAccessException e) { + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { logger.error( "Failed to initialize the task: " + context.getTaskConfig().getId(), @@ -135,21 +136,14 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { } public void run() { - ZkClient zkClient = null; + ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin.Builder().setZkAddress(zkAddress).build(); try { - zkClient = new ZkClient( - zkAddress, - ZkClient.DEFAULT_SESSION_TIMEOUT, - ZkClient.DEFAULT_CONNECTION_TIMEOUT, - new ZNRecordSerializer()); - ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient); - List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName); if (!nodesInCluster.contains(participantName)) { InstanceConfig instanceConfig = new InstanceConfig(participantName); instanceConfig.setHostName("localhost"); - instanceConfig.setInstanceEnabled(true); + instanceConfig.setInstanceOperation(InstanceOperation.ENABLE); if (taskTypeName != null) { instanceConfig.addTag(taskTypeName); } @@ -159,14 +153,14 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { if (taskTypeName != null) { zkHelixAdmin.addInstanceTag(clusterName, participantName, taskTypeName); } - zkHelixAdmin.enableInstance(clusterName, participantName, true); + zkHelixAdmin.setInstanceOperation(clusterName, participantName, InstanceOperation.ENABLE); logger.debug("Participant: " + participantName + " has been re-enabled at the cluster: " + clusterName); } Runtime.getRuntime().addShutdownHook(new Thread(() -> { logger.debug("Participant: " + participantName + " shutdown hook called"); try { - zkHelixAdmin.enableInstance(clusterName, participantName, false); + zkHelixAdmin.setInstanceOperation(clusterName, participantName, InstanceOperation.DISABLE); } catch (Exception e) { logger.warn("Participant: " + participantName + " was not disabled normally", e); } @@ -178,8 +172,8 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { } catch (Exception ex) { logger.error("Error in run() for Participant: " + participantName + ", reason: " + ex, ex); } finally { - if (zkClient != null) { - zkClient.close(); + if (zkHelixAdmin != null) { + zkHelixAdmin.close(); } } } @@ -201,7 +195,16 @@ public class HelixParticipant<T extends AbstractTask> implements Runnable { zkHelixManager.connect(); logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName); + if (ServerSettings.getBooleanSetting("participant.monitoring.enabled")) { + System.out.println("Starting participant monitoring server ......."); + var monitoringServer = new MonitoringServer( + ServerSettings.getSetting("participant.monitoring.host"), + ServerSettings.getIntSetting("participant.monitoring.port")); + monitoringServer.start(); + Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop)); + } Thread.currentThread().join(); + } catch (InterruptedException ex) { logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex); } catch (Exception ex) { diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java index f41afbaed2..0139cca7fd 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java @@ -19,13 +19,10 @@ */ package org.apache.airavata.helix.impl.controller; -import java.util.concurrent.CountDownLatch; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.helix.controller.HelixControllerMain; import org.apache.helix.manager.zk.ZKHelixAdmin; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,9 +41,6 @@ public class HelixController implements Runnable { private String zkAddress; private org.apache.helix.HelixManager zkHelixManager; - private CountDownLatch startLatch = new CountDownLatch(1); - private CountDownLatch stopLatch = new CountDownLatch(1); - @SuppressWarnings("WeakerAccess") public HelixController() throws ApplicationSettingsException { this.clusterName = ServerSettings.getSetting("helix.cluster.name"); @@ -56,74 +50,39 @@ public class HelixController implements Runnable { public void run() { try { - ZkClient zkClient = new ZkClient( - ServerSettings.getZookeeperConnection(), - ZkClient.DEFAULT_SESSION_TIMEOUT, - ZkClient.DEFAULT_CONNECTION_TIMEOUT, - new ZNRecordSerializer()); - ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient); - - // Creates the zk cluster if not available - if (!zkHelixAdmin.getClusters().contains(clusterName)) { - zkHelixAdmin.addCluster(clusterName, true); - } - + var zkHelixAdmin = new ZKHelixAdmin.Builder() + .setZkAddress(ServerSettings.getZookeeperConnection()) + .build(); + zkHelixAdmin.addCluster(clusterName, false); zkHelixAdmin.close(); - zkClient.close(); - - logger.info("Connection to helix cluster : " + clusterName + " with name : " + controllerName); - logger.info("Zookeeper connection string " + zkAddress); + logger.info("Starting helix controller '{}' for cluster '{}' at address '{}'", controllerName, clusterName, zkAddress); zkHelixManager = HelixControllerMain.startHelixController( zkAddress, clusterName, controllerName, HelixControllerMain.STANDALONE); - startLatch.countDown(); - stopLatch.await(); + logger.info("Controller '{}' started for cluster '{}'", controllerName, clusterName); + Thread.currentThread().join(); + } catch (InterruptedException ie) { + logger.info("Helix controller interrupted, shutting down."); } catch (Exception ex) { - logger.error("Error in run() for Controller: " + controllerName + ", reason: " + ex, ex); + logger.error("Error in run() for controller '{}', reason: {}", controllerName, ex, ex); } finally { - disconnect(); - } - } - - public void startServer() throws Exception { - - // WorkflowCleanupAgent cleanupAgent = new WorkflowCleanupAgent(); - // cleanupAgent.init(); - // ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - // executor.scheduleWithFixedDelay(cleanupAgent, 10, 120, TimeUnit.SECONDS); - - new Thread(this).start(); - try { - startLatch.await(); - logger.info("Controller: " + controllerName + ", has connected to cluster: " + clusterName); - - Runtime.getRuntime().addShutdownHook(new Thread(this::disconnect)); - - } catch (InterruptedException ex) { - logger.error("Controller: " + controllerName + ", is interrupted! reason: " + ex, ex); + if (zkHelixManager != null) { + logger.info("Controller '{}' has disconnected from cluster '{}'", controllerName, clusterName); + zkHelixManager.disconnect(); + } } } - @SuppressWarnings({"WeakerAccess", "unused"}) - public void stop() { - stopLatch.countDown(); - } - - private void disconnect() { - if (zkHelixManager != null) { - logger.info("Controller: " + controllerName + ", has disconnected from cluster: " + clusterName); - zkHelixManager.disconnect(); - } + public void start() throws Exception { + Thread controllerThread = new Thread(this); + controllerThread.start(); + Runtime.getRuntime().addShutdownHook(new Thread(controllerThread::interrupt)); } public static void main(String args[]) { try { - logger.info("Starting helix controller"); - - HelixController helixController = new HelixController(); - helixController.startServer(); - + new HelixController().start(); } catch (Exception e) { logger.error("Failed to start the helix controller", e); } diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java index 3cb6be524c..03c3772a72 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java @@ -55,10 +55,21 @@ public class GlobalParticipant extends HelixParticipant<AbstractTask> { "org.apache.airavata.helix.impl.task.aws.AWSCompletingTask", }; + public static List<Class<? extends AbstractTask>> taskClasses = new ArrayList<>(); + + static { + for (String taskClassName : TASK_CLASS_NAMES) { + try { + taskClasses.add(Class.forName(taskClassName).asSubclass(AbstractTask.class)); + } catch (ClassNotFoundException e) { + logger.error("Failed to load task class: " + taskClassName, e); + } + } + } + @SuppressWarnings("WeakerAccess") - public GlobalParticipant(List<Class<? extends AbstractTask>> taskClasses, String taskTypeName) - throws ApplicationSettingsException { - super(taskClasses, taskTypeName); + public GlobalParticipant() throws ApplicationSettingsException { + super(GlobalParticipant.taskClasses, null); } public void startServer() { @@ -70,27 +81,16 @@ public class GlobalParticipant extends HelixParticipant<AbstractTask> { public static void main(String args[]) { logger.info("Starting global participant"); - try { - ArrayList<Class<? extends AbstractTask>> taskClasses = new ArrayList<>(); - - for (String taskClassName : TASK_CLASS_NAMES) { - logger.debug("Adding task class: " + taskClassName + " to the global participant"); - taskClasses.add(Class.forName(taskClassName).asSubclass(AbstractTask.class)); - } - if (ServerSettings.getBooleanSetting("participant.monitoring.enabled")) { MonitoringServer monitoringServer = new MonitoringServer( ServerSettings.getSetting("participant.monitoring.host"), ServerSettings.getIntSetting("participant.monitoring.port")); monitoringServer.start(); - Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop)); } - - GlobalParticipant participant = new GlobalParticipant(taskClasses, null); + GlobalParticipant participant = new GlobalParticipant(); participant.startServer(); - } catch (Exception e) { logger.error("Failed to start global participant", e); } diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java index e75f494210..7551f0855f 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java @@ -64,9 +64,10 @@ public class ParserWorkflowManager extends WorkflowManager { Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters"))); } - public static void main(String[] args) throws Exception { - - if (ServerSettings.getBooleanSetting("parser.workflow.manager.monitoring.enabled")) { + @Override + public void run() { + try { + if (ServerSettings.getBooleanSetting("parser.workflow.manager.monitoring.enabled")) { MonitoringServer monitoringServer = new MonitoringServer( ServerSettings.getSetting("parser.workflow.manager.monitoring.host"), ServerSettings.getIntSetting("parser.workflow.manager.monitoring.port")); @@ -74,10 +75,15 @@ public class ParserWorkflowManager extends WorkflowManager { Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop)); } + this.init(); + this.runConsumer(); + } catch (Exception e) { + logger.error("Error starting PreWorkflowManager", e); + } + } - ParserWorkflowManager manager = new ParserWorkflowManager(); - manager.init(); - manager.runConsumer(); + public static void main(String[] args) throws Exception { + new ParserWorkflowManager().run(); } private void init() throws Exception { @@ -89,12 +95,13 @@ public class ParserWorkflowManager extends WorkflowManager { RegistryService.Client registryClient = getRegistryClientPool().getResource(); try { - ProcessModel processModel; - ApplicationInterfaceDescription appDescription; try { - processModel = registryClient.getProcess(completionMessage.getProcessId()); - appDescription = registryClient.getApplicationInterface(processModel.getApplicationInterfaceId()); - + ProcessModel processModel = registryClient.getProcess(completionMessage.getProcessId()); + ApplicationInterfaceDescription appDescription = registryClient.getApplicationInterface(processModel.getApplicationInterfaceId()); + if (appDescription == null) { + logger.error("Application interface not found for process id " + completionMessage.getProcessId()); + throw new Exception("Application interface not found for process id " + completionMessage.getProcessId()); + } } catch (Exception e) { logger.error( "Failed to fetch process or application description from registry associated with process id " diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java index a23d1b5edf..ac85fdeec0 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java @@ -68,19 +68,25 @@ public class PostWorkflowManager extends WorkflowManager { Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters"))); } - public static void main(String[] args) throws Exception { - - if (ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) { - MonitoringServer monitoringServer = new MonitoringServer( - ServerSettings.getSetting("post.workflow.manager.monitoring.host"), - ServerSettings.getIntSetting("post.workflow.manager.monitoring.port")); - monitoringServer.start(); - - Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop)); + @Override + public void run() { + try { + if (ServerSettings.getBooleanSetting("post.workflow.manager.monitoring.enabled")) { + MonitoringServer monitoringServer = new MonitoringServer( + ServerSettings.getSetting("post.workflow.manager.monitoring.host"), + ServerSettings.getIntSetting("post.workflow.manager.monitoring.port")); + monitoringServer.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop)); + } + startServer(); + } catch (Exception e) { + logger.error("Error starting PreWorkflowManager", e); } + } - PostWorkflowManager postManager = new PostWorkflowManager(); - postManager.startServer(); + public static void main(String[] args) throws Exception { + new PostWorkflowManager().run(); } private void init() throws Exception { diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java index 19b31fbb4a..6102143d58 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java @@ -287,6 +287,15 @@ public class PreWorkflowManager extends WorkflowManager { return workflow; } + @Override + public void run() { + try { + startServer(); + } catch (Exception e) { + logger.error("Error starting PreWorkflowManager", e); + } + } + public static void main(String[] args) throws Exception { if (ServerSettings.getBooleanSetting("pre.workflow.manager.monitoring.enabled")) { @@ -298,8 +307,7 @@ public class PreWorkflowManager extends WorkflowManager { Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop)); } - PreWorkflowManager preWorkflowManager = new PreWorkflowManager(); - preWorkflowManager.startServer(); + new PreWorkflowManager().run(); } private class ProcessLaunchMessageHandler implements MessageHandler { diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java index eb32f19098..daf3bb4519 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java @@ -46,7 +46,7 @@ import org.apache.helix.zookeeper.api.client.RealmAwareZkClient.RealmMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class WorkflowManager { +public abstract class WorkflowManager implements Runnable { private static final Logger logger = LoggerFactory.getLogger(WorkflowManager.class); diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java index ce29676cc8..d9b2f4423a 100644 --- a/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java +++ b/airavata-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java @@ -31,7 +31,7 @@ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AbstractMonitor { +public abstract class AbstractMonitor implements Runnable { private static final Logger log = LoggerFactory.getLogger(AbstractMonitor.class); diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java index 5c4aecede3..fcca0561b5 100644 --- a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java +++ b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java @@ -44,7 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; -public class EmailBasedMonitor extends AbstractMonitor implements Runnable { +public class EmailBasedMonitor extends AbstractMonitor { private static final Logger log = LoggerFactory.getLogger(EmailBasedMonitor.class); @@ -301,14 +301,8 @@ public class EmailBasedMonitor extends AbstractMonitor implements Runnable { } } - public void startServer() throws InterruptedException { - Thread t = new Thread(this); - t.start(); - t.join(); - } - public static void main(String[] args) throws Exception { - EmailBasedMonitor monitor = new EmailBasedMonitor(); - monitor.startServer(); + new EmailBasedMonitor().run(); } + } diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java index dde0582d8f..13e4da2b2c 100644 --- a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java +++ b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java @@ -93,7 +93,16 @@ public class RealtimeMonitor extends AbstractMonitor { } } + @Override + public void run() { + try { + runConsumer(); + } catch (ApplicationSettingsException e) { + logger.error("Error while running consumer", e); + } + } + public static void main(String args[]) throws ApplicationSettingsException { - new RealtimeMonitor().runConsumer(); + new RealtimeMonitor().run(); } } diff --git a/airavata-api/src/main/java/org/apache/airavata/server/ServerMain.java b/airavata-api/src/main/java/org/apache/airavata/server/ServerMain.java index 701d5e4dd6..2306c3d6a1 100644 --- a/airavata-api/src/main/java/org/apache/airavata/server/ServerMain.java +++ b/airavata-api/src/main/java/org/apache/airavata/server/ServerMain.java @@ -19,82 +19,67 @@ */ package org.apache.airavata.server; -import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Arrays; import java.util.List; + +import org.apache.airavata.api.server.AiravataAPIServer; import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.*; import org.apache.airavata.common.utils.ApplicationSettings.ShutdownStrategy; import org.apache.airavata.common.utils.IServer.ServerStatus; -import org.apache.airavata.common.utils.StringUtil.CommandLineParameters; +import org.apache.airavata.credential.store.server.CredentialStoreServer; +import org.apache.airavata.db.event.manager.DBEventManagerRunner; +import org.apache.airavata.helix.impl.controller.HelixController; +import org.apache.airavata.helix.impl.participant.GlobalParticipant; +import org.apache.airavata.helix.impl.workflow.PostWorkflowManager; +import org.apache.airavata.helix.impl.workflow.PreWorkflowManager; +import org.apache.airavata.monitor.email.EmailBasedMonitor; +import org.apache.airavata.monitor.realtime.RealtimeMonitor; +import org.apache.airavata.orchestrator.server.OrchestratorServer; import org.apache.airavata.patform.monitoring.MonitoringServer; +import org.apache.airavata.registry.api.service.RegistryAPIServer; +import org.apache.airavata.service.profile.server.ProfileServiceServer; +import org.apache.airavata.sharing.registry.server.SharingRegistryServer; import org.apache.commons.cli.ParseException; -import org.apache.zookeeper.server.ServerCnxnFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ServerMain { private static List<IServer> servers; - private static final String SERVERS_KEY = "servers"; + private static List<Class<?>> additionalServers; private static final Logger logger = LoggerFactory.getLogger(ServerMain.class); private static boolean serversLoaded = false; - private static final String stopFileNamePrefix = "server_stop"; - private static long serverPID = -1; - private static final String serverStartedFileNamePrefix = "server_start"; private static boolean systemShutDown = false; - private static String STOP_COMMAND_STR = "stop"; - - private static final String ALL_IN_ONE = "all"; - private static final String API_ORCH = "api-orch"; - private static final String EXECUTION = "execution"; - // server names - private static final String API_SERVER = "apiserver.class"; - private static final String CREDENTIAL_STORE = "credential.store.class"; - private static final String REGISTRY_SERVER = "regserver"; - private static final String SHARING_SERVER = "sharing_server"; - private static final String GFAC_SERVER = "gfac"; - private static final String ORCHESTRATOR = "orchestrator"; - private static final String PROFILE_SERVICE = "profile_service.class"; - private static final String DB_EVENT_MANAGER = "db_event_manager.class"; - private static ServerCnxnFactory cnxnFactory; - // private static boolean shutdownHookCalledBefore=false; static { - servers = new ArrayList<IServer>(); - } - - private static void loadServers(String serverNames) { - try { - if (serverNames != null) { - List<String> serversList = handleServerDependencies(serverNames); - for (String serverString : serversList) { - serverString = serverString.trim(); - String serverClassName = ServerSettings.getSetting(serverString); - Class<?> classInstance; - try { - classInstance = ServerMain.class.getClassLoader().loadClass(serverClassName); - servers.add((IServer) classInstance.newInstance()); - } catch (ClassNotFoundException e) { - logger.error("Error while locating server implementation \"" + serverString + "\"!!!", e); - } catch (InstantiationException e) { - logger.error("Error while initiating server instance \"" + serverString + "\"!!!", e); - } catch (IllegalAccessException e) { - logger.error("Error while initiating server instance \"" + serverString + "\"!!!", e); - } catch (ClassCastException e) { - logger.error("Invalid server \"" + serverString + "\"!!!", e); - } - } - } else { - logger.warn("No server name specify to start, use -h command line option to view help menu ..."); - } - } catch (ApplicationSettingsException e) { - logger.error("Error while retrieving server list!!!", e); - } + servers = new ArrayList<>(); + additionalServers = new ArrayList<>(); + } + + private static void loadServers() { + servers.clear(); + additionalServers.clear(); + + servers.addAll(Arrays.asList( + new DBEventManagerRunner(), + new RegistryAPIServer(), + new CredentialStoreServer(), + new SharingRegistryServer(), + new AiravataAPIServer(), + new OrchestratorServer(), + new ProfileServiceServer() + )); + + additionalServers.addAll(Arrays.asList( + HelixController.class, + GlobalParticipant.class, + EmailBasedMonitor.class, + RealtimeMonitor.class, + PreWorkflowManager.class, + PostWorkflowManager.class + )); serversLoaded = true; Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { @@ -104,65 +89,6 @@ public class ServerMain { }); } - private static List<String> handleServerDependencies(String serverNames) { - List<String> serverList = new ArrayList<>(Arrays.asList(serverNames.split(","))); - if (serverList.indexOf(ALL_IN_ONE) > -1) { - serverList.clear(); - serverList.add(DB_EVENT_MANAGER); // DB Event Manager should start before everything - serverList.add(REGISTRY_SERVER); // registry server should start before everything else - serverList.add(CREDENTIAL_STORE); // credential store should start before api server - serverList.add(SHARING_SERVER); - serverList.add(API_SERVER); - serverList.add(ORCHESTRATOR); - serverList.add(GFAC_SERVER); - serverList.add(PROFILE_SERVICE); - } else if (serverList.indexOf(API_ORCH) > -1) { - serverList.clear(); - serverList.add(DB_EVENT_MANAGER); // DB Event Manager should start before everything - serverList.add(REGISTRY_SERVER); // registry server should start before everything else - serverList.add(CREDENTIAL_STORE); // credential store should start before api server - serverList.add(SHARING_SERVER); - serverList.add(API_SERVER); - serverList.add(ORCHESTRATOR); - serverList.add(PROFILE_SERVICE); - } else if (serverList.indexOf(EXECUTION) > -1) { - serverList.clear(); - serverList.add(GFAC_SERVER); - } else { - // registry server should start before everything - int regPos = serverList.indexOf(REGISTRY_SERVER); - if (regPos > 0) { - String temp = serverList.get(0); - serverList.set(0, serverList.get(regPos)); - serverList.set(regPos, temp); - } - - // credential store should start before api server - int credPos = serverList.indexOf(CREDENTIAL_STORE); - int apiPos = serverList.indexOf(API_SERVER); - if (credPos >= 0 && apiPos >= 0 && (credPos > apiPos)) { - String temp = serverList.get(apiPos); - serverList.set(apiPos, serverList.get(credPos)); - serverList.set(credPos, temp); - } - } - return serverList; - } - - // private static void addSecondaryShutdownHook(){ - // Runtime.getRuntime().addShutdownHook(new Thread(){ - // @Override - // public void run() { - // System.out.print("Graceful shutdown attempt is still active. Do you want to exit instead? (y/n)"); - // String command=System.console().readLine().trim().toLowerCase(); - // if (command.equals("yes") || command.equals("y")){ - // System.exit(1); - // } - - // } - // }); - // } - public static void main(String[] args) throws IOException, AiravataException, ParseException { ServerSettings.mergeSettingsCommandLineArgs(args); if (ServerSettings.getBooleanSetting("api.server.monitoring.enabled")) { @@ -174,34 +100,20 @@ public class ServerMain { Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop)); } - CommandLineParameters commandLineParameters = StringUtil.getCommandLineParser(args); - if (commandLineParameters.getArguments().contains(STOP_COMMAND_STR)) { - performServerStopRequest(commandLineParameters); - } else { - performServerStart(args); - } - } - - private static void performServerStart(String[] args) { - setServerStarted(); logger.info("Airavata server instance starting..."); - String serverNames = "all"; - for (String string : args) { - logger.info("Server Arguments: " + string); - if (string.startsWith("--servers=")) { - serverNames = string.substring("--servers=".length()); - } - } - serverNames = ApplicationSettings.getSetting(SERVERS_KEY, serverNames); - startAllServers(serverNames); - while (!hasStopRequested()) { - try { + startAllServers(); + + // Wait until SIGTERM or KeyboardInterrupt (Ctrl+C) triggers shutdown hook. + try { + while (!isSystemShutDown()) { Thread.sleep(2000); - } catch (InterruptedException e) { - stopAllServers(); } + } catch (InterruptedException e) { + logger.info("Interrupted, shutting down servers..."); + setSystemShutDown(); } - if (hasStopRequested()) { + + if (isSystemShutDown()) { ServerSettings.setStopAllThreads(true); stopAllServers(); ShutdownStrategy shutdownStrategy; @@ -222,111 +134,6 @@ public class ServerMain { } } - private static void performServerStopRequest(CommandLineParameters commandLineParameters) throws IOException { - // deleteOldStartRecords(); - String serverIndexOption = "serverIndex"; - if (commandLineParameters.getParameters().containsKey(serverIndexOption)) { - serverPID = Integer.parseInt(commandLineParameters.getParameters().get(serverIndexOption)); - } - if (isServerRunning()) { - logger.info("Requesting airavata server" + (serverPID == -1 ? "(s)" : " instance " + serverPID) - + " to stop..."); - requestStop(); - while (isServerRunning()) { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - logger.info("Server" + (serverPID == -1 ? "(s)" : " instance " + serverPID) + " stopped!!!"); - } else { - logger.error("Server" + (serverPID == -1 ? "" : " instance " + serverPID) + " is not running!!!"); - } - if (ServerSettings.isEmbeddedZK()) { - cnxnFactory.shutdown(); - } - } - - @SuppressWarnings("resource") - private static void requestStop() throws IOException { - File file = new File(getServerStopFileName()); - file.createNewFile(); - new RandomAccessFile(file, "rw").getChannel().lock(); - file.deleteOnExit(); - } - - private static boolean hasStopRequested() { - return isSystemShutDown() - || new File(getServerStopFileName()).exists() - || new File(stopFileNamePrefix).exists(); - } - - private static String getServerStopFileName() { - return stopFileNamePrefix; - } - - private static void deleteOldStopRequests() { - File[] files = new File(".").listFiles(); - for (File file : files) { - if (file.getName().contains(stopFileNamePrefix)) { - file.delete(); - } - } - } - - // private static void deleteOldStartRecords(){ - // File[] files = new File(".").listFiles(); - // for (File file : files) { - // if (file.getName().contains(serverStartedFileNamePrefix)){ - // try { - // new FileOutputStream(file); - // file.delete(); - // } catch (Exception e) { - // //file is locked which means there's an active process using it - // } - // } - // } - // } - - private static boolean isServerRunning() { - if (serverPID == -1) { - String[] files = new File(".").list(); - for (String file : files) { - if (file.contains(serverStartedFileNamePrefix)) { - return true; - } - } - return false; - } else { - return new File(getServerStartedFileName()).exists(); - } - } - - @SuppressWarnings({"resource"}) - private static void setServerStarted() { - try { - serverPID = getPID(); - deleteOldStopRequests(); - File serverStartedFile = null; - serverStartedFile = new File(getServerStartedFileName()); - serverStartedFile.createNewFile(); - serverStartedFile.deleteOnExit(); - new RandomAccessFile(serverStartedFile, "rw").getChannel().lock(); - } catch (FileNotFoundException e) { - logger.error(e.getMessage(), e); - } catch (IOException e) { - logger.error(e.getMessage(), e); - } - } - - private static String getServerStartedFileName() { - return new File( - new File(System.getenv("AIRAVATA_HOME"), "bin"), - serverStartedFileNamePrefix + "_" + Long.toString(serverPID)) - .toString(); - } - public static void stopAllServers() { // stopping should be done in reverse order of starting the servers for (int i = servers.size() - 1; i >= 0; i--) { @@ -339,9 +146,9 @@ public class ServerMain { } } - public static void startAllServers(String serversNames) { + public static void startAllServers() { if (!serversLoaded) { - loadServers(serversNames); + loadServers(); } for (IServer server : servers) { try { @@ -358,9 +165,6 @@ public class ServerMain { private static void waitForServerToStart(IServer server, Integer maxWait) throws Exception { int count = 0; - // if (server.getStatus()==ServerStatus.STARTING) { - // logger.info("Waiting for " + server.getName() + " to start..."); - // } while (server.getStatus() == ServerStatus.STARTING && (maxWait == null || count < maxWait)) { Thread.sleep(SERVER_STATUS_CHANGE_WAIT_INTERVAL); count += SERVER_STATUS_CHANGE_WAIT_INTERVAL; @@ -375,8 +179,6 @@ public class ServerMain { if (server.getStatus() == ServerStatus.STOPING) { logger.info("Waiting for " + server.getName() + " to stop..."); } - // we are doing hasStopRequested() check because while we are stuck in the loop to stop there could be a - // forceStop request while (server.getStatus() == ServerStatus.STOPING && (maxWait == null || count < maxWait)) { Thread.sleep(SERVER_STATUS_CHANGE_WAIT_INTERVAL); count += SERVER_STATUS_CHANGE_WAIT_INTERVAL; @@ -393,33 +195,4 @@ public class ServerMain { private static void setSystemShutDown() { ServerMain.systemShutDown = true; } - - // private static int getPID(){ - // try { - // java.lang.management.RuntimeMXBean runtime = java.lang.management.ManagementFactory - // .getRuntimeMXBean(); - // java.lang.reflect.Field jvm = runtime.getClass() - // .getDeclaredField("jvm"); - // jvm.setAccessible(true); - // sun.management.VMManagement mgmt = (sun.management.VMManagement) jvm - // .get(runtime); - // java.lang.reflect.Method pid_method = mgmt.getClass() - // .getDeclaredMethod("getProcessId"); - // pid_method.setAccessible(true); - // - // int pid = (Integer) pid_method.invoke(mgmt); - // return pid; - // } catch (Exception e) { - // return -1; - // } - // } - - // getPID from ProcessHandle JDK 9 and onwards - private static long getPID() { - try { - return ProcessHandle.current().pid(); - } catch (Exception e) { - return -1; - } - } } diff --git a/airavata-api/src/main/java/org/apache/airavata/server/ThriftAPIServer.java b/airavata-api/src/main/java/org/apache/airavata/server/ThriftAPIServer.java new file mode 100644 index 0000000000..ca81c852e7 --- /dev/null +++ b/airavata-api/src/main/java/org/apache/airavata/server/ThriftAPIServer.java @@ -0,0 +1,223 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.airavata.server; + +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; + +import org.apache.airavata.api.Airavata; +import org.apache.airavata.api.server.handler.AiravataServerHandler; +import org.apache.airavata.common.utils.DBInitConfig; +import org.apache.airavata.common.utils.DBInitializer; +import org.apache.airavata.common.utils.IServer; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.cpi.CredentialStoreService; +import org.apache.airavata.credential.store.server.CredentialStoreServerHandler; +import org.apache.airavata.orchestrator.cpi.OrchestratorService; +import org.apache.airavata.orchestrator.server.OrchestratorServerHandler; +import org.apache.airavata.registry.api.RegistryService; +import org.apache.airavata.registry.api.service.handler.RegistryServerHandler; +import org.apache.airavata.registry.core.utils.AppCatalogDBInitConfig; +import org.apache.airavata.registry.core.utils.ExpCatalogDBInitConfig; +import org.apache.airavata.registry.core.utils.ReplicaCatalogDBInitConfig; +import org.apache.airavata.service.profile.groupmanager.cpi.GroupManagerService; +import org.apache.airavata.service.profile.handlers.GroupManagerServiceHandler; +import org.apache.airavata.service.profile.iam.admin.services.cpi.IamAdminServices; +import org.apache.airavata.service.profile.handlers.IamAdminServicesHandler; +import org.apache.airavata.service.profile.tenant.cpi.TenantProfileService; +import org.apache.airavata.service.profile.handlers.TenantProfileServiceHandler; +import org.apache.airavata.service.profile.user.cpi.UserProfileService; +import org.apache.airavata.service.profile.handlers.UserProfileServiceHandler; +import org.apache.airavata.service.profile.user.core.utils.UserProfileCatalogDBInitConfig; +import org.apache.airavata.sharing.registry.service.cpi.SharingRegistryService; +import org.apache.airavata.sharing.registry.server.SharingRegistryServerHandler; +import org.apache.airavata.sharing.registry.db.utils.SharingRegistryDBInitConfig; +import org.apache.thrift.TMultiplexedProcessor; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Airavata Thrift API Server that combines all Airavata services into a single server + * using the orchestrator's host and port configuration. + */ +public class ThriftAPIServer implements IServer { + private static final Logger logger = LoggerFactory.getLogger(ThriftAPIServer.class); + private static final String SERVER_NAME = "Airavata Thrift API"; + private static final String SERVER_VERSION = "1.0"; + + private ServerStatus status; + private TServer server; + + // Service names for multiplexing + public static final String AIRAVATA_API_SERVICE = "AiravataAPI"; + public static final String REGISTRY_SERVICE = "RegistryAPI"; + public static final String CREDENTIAL_STORE_SERVICE = "CredentialStore"; + public static final String SHARING_REGISTRY_SERVICE = "SharingRegistry"; + public static final String ORCHESTRATOR_SERVICE = "Orchestrator"; + public static final String USER_PROFILE_SERVICE = "UserProfile"; + public static final String TENANT_PROFILE_SERVICE = "TenantProfile"; + public static final String IAM_ADMIN_SERVICES = "IamAdminServices"; + public static final String GROUP_MANAGER_SERVICE = "GroupManager"; + + public ThriftAPIServer() { + setStatus(ServerStatus.STOPPED); + } + + @Override + public String getName() { + return SERVER_NAME; + } + + @Override + public String getVersion() { + return SERVER_VERSION; + } + + @Override + public void start() throws Exception { + try { + setStatus(ServerStatus.STARTING); + + // Initialize databases + logger.info("Initializing databases..."); + List<DBInitConfig> dbInitConfigs = Arrays.asList( + new ExpCatalogDBInitConfig(), + new AppCatalogDBInitConfig(), + new ReplicaCatalogDBInitConfig(), + new UserProfileCatalogDBInitConfig(), + new SharingRegistryDBInitConfig() + ); + + for (DBInitConfig dbInitConfig : dbInitConfigs) { + DBInitializer.initializeDB(dbInitConfig); + } + logger.info("Databases initialized successfully"); + + // Use orchestrator's host and port configuration + final int serverPort = Integer.parseInt(ServerSettings.getSetting("orchestrator.server.port", "8940")); + final String serverHost = ServerSettings.getSetting("orchestrator.server.host", "localhost"); + + // Create processors for each service + var airavataAPIProcessor = new Airavata.Processor<>(new AiravataServerHandler()); + var registryProcessor = new RegistryService.Processor<>(new RegistryServerHandler()); + var credentialStoreProcessor = new CredentialStoreService.Processor<>(new CredentialStoreServerHandler()); + var sharingRegistryProcessor = new SharingRegistryService.Processor<>(new SharingRegistryServerHandler()); + var orchestratorProcessor = new OrchestratorService.Processor<>(new OrchestratorServerHandler()); + var userProfileProcessor = new UserProfileService.Processor<>(new UserProfileServiceHandler()); + var tenantProfileProcessor = new TenantProfileService.Processor<>(new TenantProfileServiceHandler()); + var iamAdminServicesProcessor = new IamAdminServices.Processor<>(new IamAdminServicesHandler()); + var groupManagerProcessor = new GroupManagerService.Processor<>(new GroupManagerServiceHandler()); + + // Create multiplexed processor + TMultiplexedProcessor multiplexedProcessor = new TMultiplexedProcessor(); + + // Register all processors with their service names + multiplexedProcessor.registerProcessor(AIRAVATA_API_SERVICE, airavataAPIProcessor); + multiplexedProcessor.registerProcessor(REGISTRY_SERVICE, registryProcessor); + multiplexedProcessor.registerProcessor(CREDENTIAL_STORE_SERVICE, credentialStoreProcessor); + multiplexedProcessor.registerProcessor(SHARING_REGISTRY_SERVICE, sharingRegistryProcessor); + multiplexedProcessor.registerProcessor(ORCHESTRATOR_SERVICE, orchestratorProcessor); + multiplexedProcessor.registerProcessor(USER_PROFILE_SERVICE, userProfileProcessor); + multiplexedProcessor.registerProcessor(TENANT_PROFILE_SERVICE, tenantProfileProcessor); + multiplexedProcessor.registerProcessor(IAM_ADMIN_SERVICES, iamAdminServicesProcessor); + multiplexedProcessor.registerProcessor(GROUP_MANAGER_SERVICE, groupManagerProcessor); + + // Create server transport + TServerTransport serverTransport; + if (serverHost == null) { + serverTransport = new TServerSocket(serverPort); + } else { + InetSocketAddress inetSocketAddress = new InetSocketAddress(serverHost, serverPort); + serverTransport = new TServerSocket(inetSocketAddress); + } + + // Create and start server + TThreadPoolServer.Args options = new TThreadPoolServer.Args(serverTransport); + options.minWorkerThreads = Integer.parseInt(ServerSettings.getSetting("orchestrator.server.min.threads", "30")); + server = new TThreadPoolServer(options.processor(multiplexedProcessor)); + + // Start server in background thread + new Thread(() -> { + server.serve(); + setStatus(ServerStatus.STOPPED); + logger.info("Airavata Thrift API Stopped."); + }).start(); + + // Monitor server startup + new Thread(() -> { + while (!server.isServing()) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + break; + } + } + if (server.isServing()) { + setStatus(ServerStatus.STARTED); + logger.info("Started Airavata Thrift API on {}:{}", serverHost, serverPort); + logger.info("Registered services: {}, {}, {}, {}, {}, {}, {}, {}, {}", + AIRAVATA_API_SERVICE, REGISTRY_SERVICE, CREDENTIAL_STORE_SERVICE, + SHARING_REGISTRY_SERVICE, ORCHESTRATOR_SERVICE, USER_PROFILE_SERVICE, + TENANT_PROFILE_SERVICE, IAM_ADMIN_SERVICES, GROUP_MANAGER_SERVICE); + } + }).start(); + + } catch (TTransportException e) { + logger.error("Failed to start Airavata Thrift API", e); + setStatus(ServerStatus.FAILED); + throw new Exception("Error while starting the Airavata Thrift API", e); + } + } + + @Override + public void stop() throws Exception { + if (server != null && server.isServing()) { + setStatus(ServerStatus.STOPING); + server.stop(); + } + } + + @Override + public void restart() throws Exception { + stop(); + start(); + } + + @Override + public void configure() throws Exception { + // Configuration handled in start method + } + + @Override + public ServerStatus getStatus() throws Exception { + return status; + } + + private void setStatus(ServerStatus stat) { + status = stat; + status.updateTime(); + } +} diff --git a/modules/ide-integration/.gitignore b/modules/ide-integration/.gitignore deleted file mode 100644 index 69c3559aa6..0000000000 --- a/modules/ide-integration/.gitignore +++ /dev/null @@ -1 +0,0 @@ -database_data/ diff --git a/modules/ide-integration/README.md b/modules/ide-integration/README.md deleted file mode 100644 index 40ab393d7d..0000000000 --- a/modules/ide-integration/README.md +++ /dev/null @@ -1,294 +0,0 @@ -# Apache Airavata - IDE Integration Setup - -<div align="center"> - <h3>🚀 Complete Development Environment Setup for IntelliJ IDEA</h3> - <p>Set up a full Airavata installation inside IntelliJ IDEA for seamless development</p> -</div> - ---- - -## 📋 Prerequisites - -Before starting, ensure you have the following installed on your system: - -| Tool | Version | Purpose | Installation Link | -|------|---------|---------|-------------------| -| 🐳 **Docker & Docker Compose** | Latest | Container orchestration | [Get Docker](https://docs.docker.com/compose/) | -| 💡 **IntelliJ IDEA** | Latest | IDE with Java 17+ | [Download IDEA](https://www.jetbrains.com/idea/download/) | -| ☕ **Java JDK** | 17+ | Runtime environment | [OpenJDK 17](https://openjdk.org/projects/jdk/17/) | -| 🔧 **Apache Maven** | 3.8+ | Build tool | [Install Maven](https://maven.apache.org/install.html) | -| 📝 **Git** | Latest | Version control | [Install Git](https://git-scm.com/downloads) | -| 🐍 **Python** | 3.8+ | Django portal | [Python.org](https://www.python.org/downloads/) | -| 📦 **Node.js & npm** | Latest LTS | Frontend build tools | [Node.js](https://nodejs.org/) | - -## 🏗️ Development Environment Setup - -### 1️⃣ Clone and Prepare Repository - -```bash -# Clone the main repository -git clone https://github.com/apache/airavata.git -cd airavata - -# Build the project (this may take a few minutes) -mvn clean install -DskipTests -``` - -### 2️⃣ Open in IntelliJ IDEA - -1. **Launch IntelliJ IDEA** -2. **Open Project** → Navigate to your cloned `airavata` directory -3. **Navigate to:** `modules` → `ide-integration` module - -## 🐳 Backend Services Setup - -### 3️⃣ Configure Host Resolution - -Add the following entry to your system's hosts file: - -**Linux/macOS:** `/etc/hosts` -**Windows:** `C:\Windows\System32\drivers\etc\hosts` - -```bash -127.0.0.1 airavata.host -``` - -### 4️⃣ Start Backend Services - -Navigate to the containers directory and start all required services: - -```bash -cd .devcontainer -docker-compose up -d -``` - -**Services Started:** -- 🗄️ **MySQL Database** -- 🔐 **Keycloak** (Authentication) -- 📨 **Apache Kafka** (Messaging) -- 🐰 **RabbitMQ** (Message Queue) -- 🔒 **SSHD Server** (Secure connections) - -### 5️⃣ Initialize Database - -Apply database migrations: - -```bash -cd .devcontainer -cat ./database_scripts/init/*-migrations.sql | docker exec -i containers-db-1 mysql -p123456 -``` - -## 🖥️ Starting Airavata Components - -### 6️⃣ Start API Server - -1. **Navigate to:** `org.apache.airavata.ide.integration.APIServerStarter` -2. **Right-click** in the editor -3. **Select:** `Run 'APIServerStarter.main()'` - -> 💡 **JDK 17+ Note:** Add this JVM argument in your run configuration: -> ``` -> --add-opens java.base/java.lang=ALL-UNNAMED -> ``` - -### 7️⃣ Start Job Execution Engine - -1. **Navigate to:** `org.apache.airavata.ide.integration.JobEngineStarter` -2. **Right-click** and select **Run** - -**Components Started:** -- 🔄 Helix Controller -- 👥 Helix Participant -- ⚙️ Pre Workflow Manager -- 📋 Post Workflow Manager - -### 8️⃣ Start Job Monitoring - -#### **Setup Email Monitor (One-Time Setup)** - -1. **Create a Gmail Account** [https://accounts.google.com/signup](https://accounts.google.com/signup) - -2. **Enable 2-Step Verification** [https://myaccount.google.com/security](https://myaccount.google.com/security) - -3. **Go to App Passwords** [https://myaccount.google.com/apppasswords](https://myaccount.google.com/apppasswords) - *(Make sure you're logged in and have already enabled 2-Step Verification.)* - -4. **Generate App Password:** - - Enter the name **"Airavata"** and click **"Generate"**. - -5. **Copy the Generated App Password** - - A 16 character password will appear **copy and save it immediately**, as it will not be shown again. - -5. **Update Configuration:** - Edit `src/main/resources/airavata-server.properties`: - ```properties - [email protected] - email.based.monitor.password=your-app-password - ``` - -6. **Start Monitor:** - - Navigate to: `org.apache.airavata.ide.integration.JobMonitorStarter` - - Right-click and **Run** - -## 🌐 User Portal Setup (Django) - -### 9️⃣ Django Portal Installation - -**You can create and launch experiments and manage credentials using this portal.** - -```bash -# Navigate outside the Airavata directory -cd .. - -# Clone the Django portal repository -git clone https://github.com/apache/airavata-portals.git -cd airavata-portals/airavata-django-portal - -# Create a virtual environment -python3 -m venv venv - -# Activate the virtual environment -source venv/bin/activate # For Windows: venv\Scripts\activate - -# Install Python dependencies -pip install -r requirements.txt - -### 🔟 Configure Django Portal - -```bash -# Create local settings -cp django_airavata/settings_local.py.ide django_airavata/settings_local.py - -# Run database migrations -python3 manage.py migrate - -# Build JavaScript components -./build_js.sh - -# Load default CMS pages -python3 manage.py load_default_gateway - -# Start development server -python3 manage.py runserver -``` - -### 🌍 Access User Portal - -- **URL:** [http://localhost:8000/auth/login](http://localhost:8000/auth/login) -- **Username:** `default-admin` -- **Password:** `123456` - -## 🛠️ Admin Portal Setup (Optional) - -For registering compute resources and storage resources: - -### 1️⃣ Starting Super Admin Portal (PGA) - -**This portal is required when registering new compute or storage resources into the gateway.** - -```bash -cd .devcontainer/pga -docker-compose up -d -``` - -### 2️⃣ Configure Host Resolution - -**Get host machine IP:** - -**macOS:** -```bash -docker-compose exec pga getent hosts docker.for.mac.host.internal | awk '{ print $1 }' -``` - -**Windows:** -```bash -docker-compose exec pga getent hosts host.docker.internal -``` - -**Update container hosts:** -*Replace <host-machine-ip> with the actual IP* -```bash -docker-compose exec pga /bin/sh -c "echo '<host-machine-ip> airavata.host' >> /etc/hosts" -``` - -### 3️⃣ Access Admin Portal - -- **URL:** [http://airavata.host:8008](http://airavata.host:8008) -- **Username:** `default-admin` -- **Password:** `123456` - -## 🛑 Cleanup & Troubleshooting - -### Stop All Services - -```bash -# In each docker-compose directory, run: -docker-compose down -docker-compose rm -f - -# Remove unused containers and networks -docker system prune -``` - -### 🔐 Certificate Renewal (If Expired) - -Only needed when Keycloak certificates expire: - -```bash -cd modules/ide-integration/src/main/resources/keystores - -# Remove old keystore -rm airavata.p12 - -# Generate new keystore (airavata.p12) -keytool -genkey -keyalg RSA -alias selfsigned -keystore airavata.p12 \ - -storetype pkcs12 -storepass airavata -validity 360 -keysize 2048 \ - -dname "CN=airavata.host,OU=airavata.host,O=airavata.host,L=airavata.host,ST=airavata.host,C=airavata.host" - -# Generate self-signed key-pair (for TLS) -openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout server.key -out server.crt \ - -subj "/CN=airavata.host/OU=airavata.host/O=airavata.host/L=airavata.host/ST=airavata.host/C=airavata.host" \ - -addext "subjectAltName=DNS:airavata.host" -``` - -## 📊 Service Status Overview - -| Service | Port | Status Check | Purpose | -|---------|------|-------------|---------| -| 🗄️ **MySQL** | 3306 | `docker ps` | Database | -| 🔐 **Keycloak** | 8443 | [airavata.host:8443](http://airavata.host:8443) | Authentication | -| 📨 **Kafka** | 9092 | Internal | Messaging | -| 🐰 **RabbitMQ** | 5672 | Internal | Message Queue | -| 🌐 **Django Portal** | 8000 | [localhost:8000](http://localhost:8000) | User Interface | -| 🛠️ **PGA Admin** | 8008 | [airavata.host:8008](http://airavata.host:8008) | Admin Portal | - -## 🆘 Common Issues - -**Port Conflicts:** -```bash -# Check what's using a port -lsof -i :8000 -netstat -tulpn | grep :8000 -``` - -**Docker Issues:** -```bash -# Reset Docker -docker system prune -a -docker-compose down --volumes -``` - -**Build Failures:** -```bash -# Clean Maven cache -mvn clean -rm -rf ~/.m2/repository/org/apache/airavata -``` - ---- - -<div align="center"> - <strong>🎉 Happy Developing with Apache Airavata!</strong> - <br> - <em>Need help? Check our <a href="https://airavata.apache.org/mailing-list.html">mailing lists</a></em> -</div> \ No newline at end of file diff --git a/modules/ide-integration/pom.xml b/modules/ide-integration/pom.xml deleted file mode 100644 index 67857b0257..0000000000 --- a/modules/ide-integration/pom.xml +++ /dev/null @@ -1,56 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>airavata</artifactId> - <groupId>org.apache.airavata</groupId> - <version>0.21-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>ide-integration</artifactId> - <name>Airavata IDE Integration</name> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-api</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.mariadb.jdbc</groupId> - <artifactId>mariadb-java-client</artifactId> - </dependency> - </dependencies> - - <build> - <resources> - <resource> - <directory>../../keystores</directory> - <targetPath>keystores</targetPath> - <includes> - <include>*.jks</include> - </includes> - </resource> - </resources> - </build> - -</project> diff --git a/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/APIServerStarter.java b/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/APIServerStarter.java deleted file mode 100644 index 1445c53877..0000000000 --- a/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/APIServerStarter.java +++ /dev/null @@ -1,49 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.ide.integration; - -import org.apache.airavata.api.server.AiravataAPIServer; -import org.apache.airavata.credential.store.server.CredentialStoreServer; -import org.apache.airavata.db.event.manager.DBEventManagerRunner; -import org.apache.airavata.orchestrator.server.OrchestratorServer; -import org.apache.airavata.registry.api.service.RegistryAPIServer; -import org.apache.airavata.service.profile.server.ProfileServiceServer; -import org.apache.airavata.sharing.registry.server.SharingRegistryServer; - -public class APIServerStarter { - - public static void main(String[] args) throws Exception { - DBEventManagerRunner dbEventManagerRunner = new DBEventManagerRunner(); - RegistryAPIServer registryAPIServer = new RegistryAPIServer(); - CredentialStoreServer credentialStoreServer = new CredentialStoreServer(); - SharingRegistryServer sharingRegistryServer = new SharingRegistryServer(); - AiravataAPIServer airavataAPIServer = new AiravataAPIServer(); - OrchestratorServer orchestratorServer = new OrchestratorServer(); - ProfileServiceServer profileServiceServer = new ProfileServiceServer(); - - dbEventManagerRunner.start(); - registryAPIServer.start(); - credentialStoreServer.start(); - sharingRegistryServer.start(); - airavataAPIServer.start(); - orchestratorServer.start(); - profileServiceServer.start(); - } -} diff --git a/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java b/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java deleted file mode 100644 index 297f8dcd88..0000000000 --- a/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobEngineStarter.java +++ /dev/null @@ -1,73 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.ide.integration; - -import java.util.ArrayList; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.helix.core.AbstractTask; -import org.apache.airavata.helix.impl.controller.HelixController; -import org.apache.airavata.helix.impl.participant.GlobalParticipant; -import org.apache.airavata.helix.impl.workflow.PostWorkflowManager; -import org.apache.airavata.helix.impl.workflow.PreWorkflowManager; -import org.apache.helix.manager.zk.ZKHelixAdmin; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkClient; - -public class JobEngineStarter { - - public static void main(String args[]) throws Exception { - - ZkClient zkClient = new ZkClient( - ServerSettings.getZookeeperConnection(), - ZkClient.DEFAULT_SESSION_TIMEOUT, - ZkClient.DEFAULT_CONNECTION_TIMEOUT, - new ZNRecordSerializer()); - ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient); - - zkHelixAdmin.addCluster(ServerSettings.getSetting("helix.cluster.name"), true); - - System.out.println("Starting Helix Controller ......."); - // Starting helix controller - HelixController controller = new HelixController(); - controller.startServer(); - - ArrayList<Class<? extends AbstractTask>> taskClasses = new ArrayList<>(); - - for (String taskClassName : GlobalParticipant.TASK_CLASS_NAMES) { - taskClasses.add(Class.forName(taskClassName).asSubclass(AbstractTask.class)); - } - - System.out.println("Starting Helix Participant ......."); - - // Starting helix participant - GlobalParticipant participant = new GlobalParticipant(taskClasses, null); - participant.startServer(); - - System.out.println("Starting Pre Workflow Manager ......."); - - PreWorkflowManager preWorkflowManager = new PreWorkflowManager(); - preWorkflowManager.startServer(); - - System.out.println("Starting Post Workflow Manager ......."); - - PostWorkflowManager postWorkflowManager = new PostWorkflowManager(); - postWorkflowManager.startServer(); - } -} diff --git a/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobMonitorStarter.java b/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobMonitorStarter.java deleted file mode 100644 index 2068e4e38a..0000000000 --- a/modules/ide-integration/src/main/java/org/apache/airavata/ide/integration/JobMonitorStarter.java +++ /dev/null @@ -1,29 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.ide.integration; - -import org.apache.airavata.monitor.email.EmailBasedMonitor; - -public class JobMonitorStarter { - public static void main(String args[]) throws Exception { - EmailBasedMonitor emailBasedMonitor = new EmailBasedMonitor(); - emailBasedMonitor.startServer(); - } -} diff --git a/pom.xml b/pom.xml index edafd19681..e6e931c830 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,6 @@ under the License. <module>modules/restproxy</module> <module>modules/registry-db-migrator</module> <module>modules/registry-jpa-generator</module> - <module>modules/ide-integration</module> </modules> <properties>
