This is an automated email from the ASF dual-hosted git repository.
yasith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new 6f0e750e4d Improve Batch Job Submission and Tracking in SDK (#553)
6f0e750e4d is described below
commit 6f0e750e4d9aa45b42c974d7e8048e85000845b6
Author: Yasith Jayawardana <[email protected]>
AuthorDate: Fri Aug 8 12:56:06 2025 -0500
Improve Batch Job Submission and Tracking in SDK (#553)
* improve sdk functions
* implement missing functions for batch sdk
* list actual runtimes from API. reorganize auth code. require cpus, nodes,
and walltime per run
* add file linking logic
* bump version to 2.2.0
* bump version to 2.2.1, simplify domain-specific app import
* commit app scripts into airavata-agent/application
* bump version to 2.2.2, fix bug in namd.sh
* fix bugs and bump version to 2.2.3
---
.../airavata_auth/device_auth.py | 9 +
.../airavata_experiments/__init__.py | 5 +-
.../airavata_experiments/airavata.py | 163 ++++++++-----
.../airavata_experiments/base.py | 45 ++--
.../airavata_experiments/plan.py | 27 ++-
.../airavata_experiments/runtime.py | 120 +++++++---
.../airavata_experiments/task.py | 12 +-
.../airavata_jupyter_magic/__init__.py | 118 ++++++----
.../airavata-python-sdk/airavata_sdk/__init__.py | 2 +-
dev-tools/airavata-python-sdk/pyproject.toml | 2 +-
.../airavata-agent/application/README.md | 19 ++
.../airavata-agent/application/alphafold2.sh | 91 ++++++++
.../airavata-agent/application/gaussian16.sh | 256 +++++++++++++++++++++
.../airavata-agent/application/gromacs.sh | 165 +++++++++++++
.../airavata-agent/application/namd.sh | 130 +++++++++++
.../airavata-agent/application/pmemd_cuda.sh | 4 +
.../airavata-agent/application/pmemd_mpi.sh | 4 +
.../airavata-agent/application/psi4.sh | 4 +
18 files changed, 1020 insertions(+), 156 deletions(-)
diff --git a/dev-tools/airavata-python-sdk/airavata_auth/device_auth.py
b/dev-tools/airavata-python-sdk/airavata_auth/device_auth.py
index 3037ab1702..944055ad26 100644
--- a/dev-tools/airavata-python-sdk/airavata_auth/device_auth.py
+++ b/dev-tools/airavata-python-sdk/airavata_auth/device_auth.py
@@ -10,6 +10,13 @@ from airavata_sdk import Settings
class AuthContext:
+
+ @staticmethod
+ def get_access_token():
+ if os.environ.get("CS_ACCESS_TOKEN", None) is None:
+ context = AuthContext()
+ context.login()
+ return os.environ["CS_ACCESS_TOKEN"]
def __init__(self):
self.settings = Settings()
@@ -21,6 +28,8 @@ class AuthContext:
self.console = Console()
def login(self):
+ if os.environ.get('CS_ACCESS_TOKEN', None) is not None:
+ return
# Step 1: Request device and user code
auth_device_url =
f"{self.settings.AUTH_SERVER_URL}/realms/{self.settings.AUTH_REALM}/protocol/openid-connect/auth/device"
response = requests.post(auth_device_url, data={
diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/__init__.py
b/dev-tools/airavata-python-sdk/airavata_experiments/__init__.py
index dd391c07c4..bb5e33a0dd 100644
--- a/dev-tools/airavata-python-sdk/airavata_experiments/__init__.py
+++ b/dev-tools/airavata-python-sdk/airavata_experiments/__init__.py
@@ -18,8 +18,9 @@ from __future__ import annotations
from . import base, plan
from airavata_auth.device_auth import AuthContext
-from .runtime import list_runtimes, Runtime
+from .runtime import find_runtimes, Runtime
from typing import Any
+from . import md, neuro
context = AuthContext()
@@ -27,7 +28,7 @@ context = AuthContext()
def login():
context.login()
-__all__ = ["list_runtimes", "base", "plan", "login"]
+__all__ = ["find_runtimes", "base", "plan", "login", "md", "neuro"]
def display_runtimes(runtimes: list[Runtime]) -> None:
diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py
b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py
index e6d7b10385..f5ca757de1 100644
--- a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py
+++ b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py
@@ -35,7 +35,9 @@ from airavata.model.security.ttypes import AuthzToken
from airavata.model.experiment.ttypes import ExperimentModel, ExperimentType,
UserConfigurationDataModel
from airavata.model.scheduling.ttypes import
ComputationalResourceSchedulingModel
from airavata.model.data.replica.ttypes import DataProductModel,
DataProductType, DataReplicaLocationModel, ReplicaLocationCategory
-from airavata.model.appcatalog.groupresourceprofile.ttypes import
GroupResourceProfile
+from airavata.model.appcatalog.groupresourceprofile.ttypes import
GroupResourceProfile, ResourceType
+from airavata.model.appcatalog.computeresource.ttypes import
ComputeResourceDescription
+from airavata.model.status.ttypes import JobStatus, JobState,
ExperimentStatus, ExperimentState
warnings.filterwarnings("ignore", category=DeprecationWarning)
logger = logging.getLogger("airavata_sdk.clients")
@@ -100,8 +102,10 @@ class AiravataOperator:
)
def get_resource_host_id(self, resource_name):
- resources: dict =
self.api_server_client.get_all_compute_resource_names(self.airavata_token) #
type: ignore
- return next((str(k) for k, v in resources.items() if v ==
resource_name))
+ resources =
self.api_server_client.get_all_compute_resource_names(self.airavata_token)
+ resource_id = next((k for k in resources if k.startswith(resource_name)),
None)
+ assert resource_id is not None, f"Compute resource {resource_name} not
found"
+ return resource_id
def configure_computation_resource_scheduling(
self,
@@ -188,7 +192,7 @@ class AiravataOperator:
"""
tree =
self.api_server_client.get_detailed_experiment_tree(self.airavata_token,
experiment_id) # type: ignore
processModels = tree.processes
- assert processModels is not None
+ assert processModels is not None, f"No process models found for experiment
{experiment_id}"
assert len(processModels) == 1, f"Expected 1 process model, got
{len(processModels)}"
return processModels[0].processId
@@ -213,7 +217,8 @@ class AiravataOperator:
sr_hostname = sr_hostname or self.default_sr_hostname()
# logic
sr_names: dict[str, str] =
self.api_server_client.get_all_storage_resource_names(self.airavata_token) #
type: ignore
- sr_id = next((str(k) for k, v in sr_names.items() if v == sr_hostname))
+ sr_id = next((str(k) for k, v in sr_names.items() if v == sr_hostname),
None)
+ assert sr_id is not None, f"Storage resource {sr_hostname} not found"
return
self.api_server_client.get_gateway_storage_preference(self.airavata_token,
gateway_id, sr_id)
def get_storage(self, storage_name: str | None = None) -> any: # type:
ignore
@@ -225,7 +230,8 @@ class AiravataOperator:
storage_name = storage_name or self.default_sr_hostname()
# logic
sr_names: dict[str, str] =
self.api_server_client.get_all_storage_resource_names(self.airavata_token) #
type: ignore
- sr_id = next((str(k) for k, v in sr_names.items() if v == storage_name))
+ sr_id = next((str(k) for k, v in sr_names.items() if v == storage_name),
None)
+ assert sr_id is not None, f"Storage resource {storage_name} not found"
storage = self.api_server_client.get_storage_resource(self.airavata_token,
sr_id)
return storage
@@ -236,11 +242,9 @@ class AiravataOperator:
"""
# logic
grps: list[GroupResourceProfile] =
self.api_server_client.get_group_resource_list(self.airavata_token,
self.default_gateway_id()) # type: ignore
- try:
- grp_id = next((grp.groupResourceProfileId for grp in grps if
grp.groupResourceProfileName == group))
- return str(grp_id)
- except StopIteration:
- raise Exception(f"Group resource profile {group} not found")
+ grp_id = next((grp.groupResourceProfileId for grp in grps if
grp.groupResourceProfileName == group), None)
+ assert grp_id is not None, f"Group resource profile {group} not found"
+ return str(grp_id)
def get_group_resource_profile(self, group_id: str):
grp =
self.api_server_client.get_group_resource_profile(self.airavata_token,
group_id) # type: ignore
@@ -253,7 +257,8 @@ class AiravataOperator:
"""
# logic
grps: list =
self.api_server_client.get_group_resource_list(self.airavata_token,
self.default_gateway_id()) # type: ignore
- grp_id = next((grp.groupResourceProfileId for grp in grps if
grp.groupResourceProfileName == group))
+ grp_id = next((grp.groupResourceProfileId for grp in grps if
grp.groupResourceProfileName == group), None)
+ assert grp_id is not None, f"Group resource profile {group} not found"
deployments =
self.api_server_client.get_application_deployments_for_app_module_and_group_resource_profile(self.airavata_token,
app_interface_id, grp_id)
return deployments
@@ -264,13 +269,15 @@ class AiravataOperator:
"""
gateway_id = str(gateway_id or self.default_gateway_id())
apps: list =
self.api_server_client.get_all_application_interfaces(self.airavata_token,
gateway_id) # type: ignore
- app_id = next((app.applicationInterfaceId for app in apps if
app.applicationName == app_name))
+ app_id = next((app.applicationInterfaceId for app in apps if
app.applicationName == app_name), None)
+ assert app_id is not None, f"Application interface {app_name} not found"
return str(app_id)
def get_project_id(self, project_name: str, gateway_id: str | None = None):
gateway_id = str(gateway_id or self.default_gateway_id())
projects: list =
self.api_server_client.get_user_projects(self.airavata_token, gateway_id,
self.user_id, 10, 0) # type: ignore
- project_id = next((p.projectID for p in projects if p.name == project_name
and p.owner == self.user_id))
+ project_id = next((p.projectID for p in projects if p.name == project_name
and p.owner == self.user_id), None)
+ assert project_id is not None, f"Project {project_name} not found"
return str(project_id)
def get_application_inputs(self, app_interface_id: str) -> list:
@@ -323,14 +330,14 @@ class AiravataOperator:
elif process_id is not None and agent_ref is not None:
assert len(local_files) == 1, f"Expected 1 file, got {len(local_files)}"
file = local_files[0]
- fp = os.path.join("/data", file.name)
+ fp = os.path.join(".", file.name)
rawdata = file.read_bytes()
b64data = base64.b64encode(rawdata).decode()
res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell",
json={
"agentId": agent_ref,
"envName": agent_ref,
"workingDir": ".",
- "arguments": ["sh", "-c", f"echo {b64data} | base64 -d > {fp}"]
+ "arguments": [f"echo {b64data} | base64 -d > {fp}"]
})
data = res.json()
if data["error"] is not None:
@@ -372,7 +379,7 @@ class AiravataOperator:
"agentId": agent_ref,
"envName": agent_ref,
"workingDir": ".",
- "arguments": ["sh", "-c", r"find /data -type d -name 'venv' -prune -o
-type f -printf '%P\n' | sort"]
+ "arguments": [r"find . -type f -printf '%P\n' | sort"]
})
data = res.json()
if data["error"] is not None:
@@ -405,12 +412,12 @@ class AiravataOperator:
"""
import os
- fp = os.path.join("/data", remote_file)
+ fp = os.path.join(".", remote_file)
res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell",
json={
"agentId": agent_ref,
"envName": agent_ref,
"workingDir": ".",
- "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"]
+ "arguments": [f"cat {fp} | base64 -w0"]
})
data = res.json()
if data["error"] is not None:
@@ -441,7 +448,36 @@ class AiravataOperator:
assert process_id is not None, f"Expected process_id, got {process_id}"
url_path = os.path.join(process_id, remote_file)
filemgr_svc_download_url =
f"{self.filemgr_svc_url()}/download/live/{url_path}"
-
+
+ def execute_cmd(self, agent_ref: str, cmd: str) -> bytes:
+ """
+ Execute a command on a remote directory of a storage resource
+ TODO add data_svc fallback
+
+ Return Path: /{project_name}/{experiment_name}
+
+ """
+ res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell",
json={
+ "agentId": agent_ref,
+ "envName": agent_ref,
+ "workingDir": ".",
+ "arguments": [f"{cmd} | base64 -w0"]
+ })
+ data = res.json()
+ if data["error"] is not None:
+ raise Exception(data["error"])
+ else:
+ exc_id = data["executionId"]
+ while True:
+ res =
requests.get(f"{self.connection_svc_url()}/agent/execute/shell/{exc_id}")
+ data = res.json()
+ if data["executed"]:
+ content = data["responseString"]
+ import base64
+ content = base64.b64decode(content)
+ return content
+ time.sleep(1)
+
def cat_file(self, process_id: str, agent_ref: str, sr_host: str,
remote_file: str, remote_dir: str) -> bytes:
"""
Download files from a remote directory of a storage resource to a local
directory
@@ -451,12 +487,12 @@ class AiravataOperator:
"""
import os
- fp = os.path.join("/data", remote_file)
+ fp = os.path.join(".", remote_file)
res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell",
json={
"agentId": agent_ref,
"envName": agent_ref,
"workingDir": ".",
- "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"]
+ "arguments": [f"cat {fp} | base64 -w0"]
})
data = res.json()
if data["error"] is not None:
@@ -596,8 +632,8 @@ class AiravataOperator:
def register_input_file(file: Path) -> str:
return str(self.register_input_file(file.name, sr_host, sr_id,
gateway_id, file.name, abs_path))
- # set up file inputs
- print("[AV] Setting up file inputs...")
+ # set up experiment inputs
+ print("[AV] Setting up experiment inputs...")
files_to_upload = list[Path]()
file_refs = dict[str, str | list[str]]()
for key, value in file_inputs.items():
@@ -610,11 +646,9 @@ class AiravataOperator:
file_refs[key] = [*map(register_input_file, value)]
else:
raise ValueError("Invalid file input type")
-
- # configure experiment inputs
experiment_inputs = []
for exp_input in
self.api_server_client.get_application_inputs(self.airavata_token,
app_interface_id): # type: ignore
- assert exp_input.type is not None
+ assert exp_input.type is not None, f"Invalid exp_input type for
{exp_input.name}: {exp_input.type}"
if exp_input.type < 3 and exp_input.name in data_inputs:
value = data_inputs[exp_input.name]
if exp_input.type == 0:
@@ -623,11 +657,12 @@ class AiravataOperator:
exp_input.value = repr(value)
elif exp_input.type == 3 and exp_input.name in file_refs:
ref = file_refs[exp_input.name]
- assert isinstance(ref, str)
+ assert isinstance(ref, str), f"Invalid file ref: {ref}"
exp_input.value = ref
elif exp_input.type == 4 and exp_input.name in file_refs:
exp_input.value = ','.join(file_refs[exp_input.name])
experiment_inputs.append(exp_input)
+ print(f"[AV] * {exp_input.name}={exp_input.value}")
experiment.experimentInputs = experiment_inputs
# configure experiment outputs
@@ -670,14 +705,15 @@ class AiravataOperator:
# wait until task begins, then get job id
print(f"[AV] Experiment {experiment_name} WAITING until task begins...")
job_id = job_state = None
- while job_state is None:
+ while job_id in [None, "N/A"]:
try:
job_id, job_state = self.get_task_status(ex_id)
except:
time.sleep(2)
else:
time.sleep(2)
- print(f"[AV] Experiment {experiment_name} - Task {job_state} with id:
{job_id}")
+ assert job_state is not None, f"Job state is None for job id: {job_id}"
+ print(f"[AV] Experiment {experiment_name} - Task {job_state.name} with id:
{job_id}")
return LaunchState(
experiment_id=ex_id,
@@ -688,14 +724,12 @@ class AiravataOperator:
sr_host=storage.hostName,
)
- def get_experiment_status(self, experiment_id: str) -> Literal['CREATED',
'VALIDATED', 'SCHEDULED', 'LAUNCHED', 'EXECUTING', 'CANCELING', 'CANCELED',
'COMPLETED', 'FAILED']:
- states = ["CREATED", "VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING",
"CANCELING", "CANCELED", "COMPLETED", "FAILED"]
+ def get_experiment_status(self, experiment_id: str) -> ExperimentState:
status = self.api_server_client.get_experiment_status(self.airavata_token,
experiment_id)
- state = status.state.name
- if state in states:
- return state
- else:
- return "FAILED"
+ if status is None:
+ return ExperimentState.CREATED
+ assert isinstance(status, ExperimentStatus)
+ return status.state
def stop_experiment(self, experiment_id: str):
status = self.api_server_client.terminate_experiment(
@@ -781,23 +815,48 @@ class AiravataOperator:
print(f"[av] Remote execution failed! {e}")
return None
- def get_available_runtimes(self):
+ def get_available_groups(self, gateway_id: str = "default"):
+ grps: list[GroupResourceProfile] =
self.api_server_client.get_group_resource_list(self.airavata_token,
gatewayId=gateway_id)
+ return grps
+
+ def get_available_runtimes(self, group: str, gateway_id: str = "default"):
+ grps = self.get_available_groups(gateway_id)
+ grp_id, gcr_prefs, gcr_policies = next(((x.groupResourceProfileId,
x.computePreferences, x.computeResourcePolicies) for x in grps if
str(x.groupResourceProfileName).strip() == group.strip()), (None, None, None))
+ assert grp_id is not None, f"Group {group} was not found"
+ assert gcr_prefs is not None, f"Compute preferences for group={grp_id}
were not found"
+ assert gcr_policies is not None, f"Compute policies for group={grp_id}
were not found" # type: ignore
from .runtime import Remote
- return [
- Remote(cluster="login.expanse.sdsc.edu", category="gpu",
queue_name="gpu-shared", node_count=1, cpu_count=10, gpu_count=1, walltime=30,
group="Default"),
- Remote(cluster="login.expanse.sdsc.edu", category="cpu",
queue_name="shared", node_count=1, cpu_count=10, gpu_count=0, walltime=30,
group="Default"),
- Remote(cluster="anvil.rcac.purdue.edu", category="cpu",
queue_name="shared", node_count=1, cpu_count=24, gpu_count=0, walltime=30,
group="Default"),
- ]
+ runtimes = []
+ for pref in gcr_prefs:
+ cr = self.api_server_client.get_compute_resource(self.airavata_token,
pref.computeResourceId)
+ assert cr is not None, "Compute resource not found"
+ assert isinstance(cr, ComputeResourceDescription), "Compute resource is
not a ComputeResourceDescription"
+ assert cr.batchQueues is not None, "Compute resource has no batch queues"
+ for queue in cr.batchQueues:
+ if pref.resourceType == ResourceType.SLURM:
+ policy = next((p for p in gcr_policies if p.computeResourceId ==
pref.computeResourceId), None)
+ assert policy is not None, f"Compute resource policy not found for
{pref.computeResourceId}"
+ if queue.queueName not in (policy.allowedBatchQueues or []):
+ continue
+ runtime = Remote(
+ cluster=pref.computeResourceId.split("_")[0],
+ category="GPU" if "gpu" in queue.queueName.lower() else "CPU",
+ queue_name=queue.queueName,
+ node_count=queue.maxNodes or 1,
+ cpu_count=queue.cpuPerNode or 1,
+ gpu_count=1 if "gpu" in queue.queueName.lower() else 0,
+ walltime=queue.maxRunTime or 30,
+ group=group,
+ )
+ runtimes.append(runtime)
+ return runtimes
- def get_task_status(self, experiment_id: str) -> tuple[str,
Literal["SUBMITTED", "UN_SUBMITTED", "SETUP", "QUEUED", "ACTIVE", "COMPLETE",
"CANCELING", "CANCELED", "FAILED", "HELD", "SUSPENDED", "UNKNOWN"] | None]:
- states = ["SUBMITTED", "UN_SUBMITTED", "SETUP", "QUEUED", "ACTIVE",
"COMPLETE", "CANCELING", "CANCELED", "FAILED", "HELD", "SUSPENDED", "UNKNOWN"]
- job_details: dict =
self.api_server_client.get_job_statuses(self.airavata_token, experiment_id) #
type: ignore
+ def get_task_status(self, experiment_id: str) -> tuple[str, JobState]:
+ job_details: dict[str, JobStatus] =
self.api_server_client.get_job_statuses(self.airavata_token, experiment_id) #
type: ignore
job_id = job_state = None
- # get the most recent job id and state
for job_id, v in job_details.items():
- if v.reason in states:
- job_state = v.reason
- else:
- job_state = states[int(v.jobState)]
- return job_id or "N/A", job_state # type: ignore
+ job_state = v.jobState
+ return job_id or "N/A", job_state or JobState.UNKNOWN
+
+ JobState = JobState
diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/base.py
b/dev-tools/airavata-python-sdk/airavata_experiments/base.py
index e9ad36b68e..2ba99a3bcb 100644
--- a/dev-tools/airavata-python-sdk/airavata_experiments/base.py
+++ b/dev-tools/airavata-python-sdk/airavata_experiments/base.py
@@ -85,49 +85,64 @@ class Experiment(Generic[T], abc.ABC):
self.resource = resource
return self
- def create_task(self, *allowed_runtimes: Runtime, name: str | None = None)
-> None:
+ def add_run(self, use: list[Runtime], cpus: int, nodes: int, walltime: int,
name: str | None = None, **extra_params) -> None:
"""
Create a task to run the experiment on a given runtime.
"""
- runtime = random.choice(allowed_runtimes) if len(allowed_runtimes) > 0
else self.resource
+ runtime = random.choice(use) if len(use) > 0 else self.resource
uuid_str = str(uuid.uuid4())[:4].upper()
-
+ # override runtime args with given values
+ runtime = runtime.model_copy()
+ runtime.args["cpu_count"] = cpus
+ runtime.args["node_count"] = nodes
+ runtime.args["walltime"] = walltime
+ # add extra inputs to task inputs
+ task_inputs = {**self.inputs, **extra_params}
+ # create a task with the given runtime and inputs
self.tasks.append(
Task(
- name=name or f"{self.name}_{uuid_str}",
+ name=f"{name or self.name}_{uuid_str}",
app_id=self.application.app_id,
- inputs={**self.inputs},
+ inputs=task_inputs,
runtime=runtime,
)
)
print(f"Task created. ({len(self.tasks)} tasks in total)")
- def add_sweep(self, *allowed_runtimes: Runtime, **space: list) -> None:
+ def add_sweep(self, use: list[Runtime], cpus: int, nodes: int, walltime:
int, name: str | None = None, **space: list) -> None:
"""
Add a sweep to the experiment.
"""
for values in product(space.values()):
- runtime = random.choice(allowed_runtimes) if len(allowed_runtimes) > 0
else self.resource
+ runtime = random.choice(use) if len(use) > 0 else self.resource
uuid_str = str(uuid.uuid4())[:4].upper()
-
+ # override runtime args with given values
+ runtime = runtime.model_copy()
+ runtime.args["cpu_count"] = cpus
+ runtime.args["node_count"] = nodes
+ runtime.args["walltime"] = walltime
+ # add sweep params to task inputs
task_specific_params = dict(zip(space.keys(), values))
agg_inputs = {**self.inputs, **task_specific_params}
task_inputs = {k: {"value": agg_inputs[v[0]], "type": v[1]} for k, v in
self.input_mapping.items()}
-
+ # create a task with the given runtime and inputs
self.tasks.append(Task(
- name=f"{self.name}_{uuid_str}",
+ name=f"{name or self.name}_{uuid_str}",
app_id=self.application.app_id,
inputs=task_inputs,
runtime=runtime or self.resource,
))
- def plan(self, **kwargs) -> Plan:
- if len(self.tasks) == 0:
- self.create_task(self.resource)
+ def plan(self) -> Plan:
+ assert len(self.tasks) > 0, "add_run() must be called before plan() to
define runtimes and resources."
tasks = []
for t in self.tasks:
agg_inputs = {**self.inputs, **t.inputs}
task_inputs = {k: {"value": agg_inputs[v[0]], "type": v[1]} for k, v in
self.input_mapping.items()}
- tasks.append(Task(name=t.name, app_id=self.application.app_id,
inputs=task_inputs, runtime=t.runtime))
- return Plan(tasks=tasks)
+ task = Task(name=t.name, app_id=self.application.app_id,
inputs=task_inputs, runtime=t.runtime)
+ # task.freeze() # TODO upload the task-related data and freeze the task
+ tasks.append(task)
+ plan = Plan(tasks=tasks)
+ plan.save()
+ return plan
diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/plan.py
b/dev-tools/airavata-python-sdk/airavata_experiments/plan.py
index f231e2583d..4f41135610 100644
--- a/dev-tools/airavata-python-sdk/airavata_experiments/plan.py
+++ b/dev-tools/airavata-python-sdk/airavata_experiments/plan.py
@@ -25,6 +25,7 @@ from rich.progress import Progress
from .runtime import is_terminal_state
from .task import Task
import uuid
+from airavata_auth.device_auth import AuthContext
from .airavata import AiravataOperator
@@ -66,11 +67,13 @@ class Plan(pydantic.BaseModel):
statuses.append(task.status())
return statuses
- def __stage_stop__(self) -> None:
- print("Stopping task(s)...")
- for task in self.tasks:
- task.stop()
- print("Task(s) stopped.")
+ def __stage_stop__(self, runs: list[int] = []) -> None:
+ runs = runs if len(runs) > 0 else list(range(len(self.tasks)))
+ print(f"Stopping task(s): {runs}")
+ for i, task in enumerate(self.tasks):
+ if i in runs:
+ task.stop()
+ print(f"Task(s) stopped: {runs}")
def __stage_fetch__(self, local_dir: str) -> list[list[str]]:
print("Fetching results...")
@@ -78,7 +81,7 @@ class Plan(pydantic.BaseModel):
for task in self.tasks:
fps.append(task.download_all(local_dir))
print("Results fetched.")
- self.save_json(os.path.join(local_dir, "plan.json"))
+ self.export(os.path.join(local_dir, "plan.json"))
return fps
def launch(self, silent: bool = True) -> None:
@@ -119,17 +122,17 @@ class Plan(pydantic.BaseModel):
assert os.path.isdir(local_dir)
self.__stage_fetch__(local_dir)
- def stop(self) -> None:
- self.__stage_stop__()
+ def stop(self, runs: list[int] = []) -> None:
+ self.__stage_stop__(runs)
self.save()
- def save_json(self, filename: str) -> None:
+ def export(self, filename: str) -> None:
with open(filename, "w") as f:
json.dump(self.model_dump(), f, indent=2)
def save(self) -> None:
settings = Settings()
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
az = av.__airavata_token__(av.access_token, av.default_gateway_id())
assert az.accessToken is not None
assert az.claimsMap is not None
@@ -162,7 +165,7 @@ def load_json(filename: str) -> Plan:
def load(id: str | None) -> Plan:
settings = Settings()
assert id is not None
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
az = av.__airavata_token__(av.access_token, av.default_gateway_id())
assert az.accessToken is not None
assert az.claimsMap is not None
@@ -183,7 +186,7 @@ def load(id: str | None) -> Plan:
def query() -> list[Plan]:
settings = Settings()
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
az = av.__airavata_token__(av.access_token, av.default_gateway_id())
assert az.accessToken is not None
assert az.claimsMap is not None
diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/runtime.py
b/dev-tools/airavata-python-sdk/airavata_experiments/runtime.py
index e843135843..f5c40e3c86 100644
--- a/dev-tools/airavata-python-sdk/airavata_experiments/runtime.py
+++ b/dev-tools/airavata-python-sdk/airavata_experiments/runtime.py
@@ -16,14 +16,37 @@
from __future__ import annotations
import abc
-from typing import Any
+from typing import Any, Literal
from pathlib import Path
-import os
import pydantic
+from airavata_auth.device_auth import AuthContext
+
# from .task import Task
Task = Any
+States = Literal[
+ # Experiment States
+ 'CREATED',
+ 'VALIDATED',
+ 'SCHEDULED',
+ 'LAUNCHED',
+ 'EXECUTING',
+ 'CANCELING',
+ 'CANCELED',
+ 'COMPLETED',
+ 'FAILED',
+ # Job States
+ 'SUBMITTED',
+ 'QUEUED',
+ 'ACTIVE',
+ 'COMPLETE',
+ 'CANCELED',
+ 'FAILED',
+ 'SUSPENDED',
+ 'UNKNOWN',
+ 'NON_CRITICAL_FAIL',
+]
class Runtime(abc.ABC, pydantic.BaseModel):
@@ -36,6 +59,9 @@ class Runtime(abc.ABC, pydantic.BaseModel):
@abc.abstractmethod
def execute_py(self, libraries: list[str], code: str, task: Task) -> None:
...
+ @abc.abstractmethod
+ def execute_cmd(self, cmd: str, task: Task) -> bytes: ...
+
@abc.abstractmethod
def status(self, task: Task) -> tuple[str, str]: ...
@@ -87,6 +113,9 @@ class Mock(Runtime):
task.agent_ref = str(uuid.uuid4())
task.ref = str(uuid.uuid4())
+ def execute_cmd(self, cmd: str, task: Task) -> bytes:
+ return b""
+
def execute_py(self, libraries: list[str], code: str, task: Task) -> None:
pass
@@ -135,7 +164,7 @@ class Remote(Runtime):
print(f"[Remote] Creating Experiment: name={task.name}")
from .airavata import AiravataOperator
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
try:
launch_state = av.launch_experiment(
experiment_name=task.name,
@@ -158,6 +187,23 @@ class Remote(Runtime):
except Exception as e:
print(f"[Remote] Failed to launch experiment: {repr(e)}")
raise e
+
+ def execute_cmd(self, cmd: str, task: Task) -> bytes:
+ assert task.ref is not None
+ assert task.agent_ref is not None
+ assert task.pid is not None
+ assert task.sr_host is not None
+ assert task.workdir is not None
+
+ from .airavata import AiravataOperator
+ av = AiravataOperator(AuthContext.get_access_token())
+ try:
+ result = av.execute_cmd(task.agent_ref, cmd)
+ return result
+ except Exception as e:
+ print(f"[Remote] Failed to execute command: {repr(e)}")
+ return b""
+
def execute_py(self, libraries: list[str], code: str, task: Task) -> None:
assert task.ref is not None
@@ -165,29 +211,29 @@ class Remote(Runtime):
assert task.pid is not None
from .airavata import AiravataOperator
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
result = av.execute_py(task.project, libraries, code, task.agent_ref,
task.pid, task.runtime.args)
print(result)
- def status(self, task: Task) -> tuple[str, str]:
+ def status(self, task: Task) -> tuple[str, States]:
assert task.ref is not None
assert task.agent_ref is not None
from .airavata import AiravataOperator
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
# prioritize job state, fallback to experiment state
job_id, job_state = av.get_task_status(task.ref)
- if not job_state or job_state == "UN_SUBMITTED":
- return job_id, av.get_experiment_status(task.ref)
+ if job_state in [AiravataOperator.JobState.UNKNOWN,
AiravataOperator.JobState.NON_CRITICAL_FAIL]:
+ return job_id, av.get_experiment_status(task.ref).name
else:
- return job_id, job_state
+ return job_id, job_state.name
def signal(self, signal: str, task: Task) -> None:
assert task.ref is not None
assert task.agent_ref is not None
from .airavata import AiravataOperator
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
av.stop_experiment(task.ref)
def ls(self, task: Task) -> list[str]:
@@ -198,7 +244,7 @@ class Remote(Runtime):
assert task.workdir is not None
from .airavata import AiravataOperator
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
files = av.list_files(task.pid, task.agent_ref, task.sr_host, task.workdir)
return files
@@ -210,7 +256,7 @@ class Remote(Runtime):
assert task.workdir is not None
from .airavata import AiravataOperator
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
result = av.upload_files(task.pid, task.agent_ref, task.sr_host, [file],
task.workdir).pop()
return result
@@ -222,7 +268,7 @@ class Remote(Runtime):
assert task.workdir is not None
from .airavata import AiravataOperator
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
result = av.download_file(task.pid, task.agent_ref, task.sr_host, file,
task.workdir, local_dir)
return result
@@ -234,30 +280,44 @@ class Remote(Runtime):
assert task.workdir is not None
from .airavata import AiravataOperator
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
+ av = AiravataOperator(AuthContext.get_access_token())
content = av.cat_file(task.pid, task.agent_ref, task.sr_host, file,
task.workdir)
return content
-def list_runtimes(
+def find_runtimes(
cluster: str | None = None,
category: str | None = None,
- group: str | None = None,
node_count: int | None = None,
cpu_count: int | None = None,
- walltime: int | None = None,
+ group: str | None = None,
) -> list[Runtime]:
from .airavata import AiravataOperator
- av = AiravataOperator(os.environ['CS_ACCESS_TOKEN'])
- all_runtimes = av.get_available_runtimes()
- out_runtimes = []
- for r in all_runtimes:
- if (cluster in [None, r.args["cluster"]]) and (category in [None,
r.args["category"]]) and (group in [None, r.args["group"]]):
- r.args["node_count"] = node_count or r.args["node_count"]
- r.args["cpu_count"] = cpu_count or r.args["cpu_count"]
- r.args["walltime"] = walltime or r.args["walltime"]
- out_runtimes.append(r)
- return out_runtimes
-
-def is_terminal_state(x):
- return x in ["CANCELED", "COMPLETED", "FAILED"]
\ No newline at end of file
+ av = AiravataOperator(AuthContext.get_access_token())
+ grps = av.get_available_groups()
+ grp_names = [str(x.groupResourceProfileName) for x in grps]
+ if group is not None:
+ assert group in grp_names, f"Group {group} was not found. Available
groups: {repr(grp_names)}"
+ groups = [g for g in grps if str(g.groupResourceProfileName) == group]
+ else:
+ groups = grps
+ runtimes = []
+ for g in groups:
+ matched_runtimes = []
+ assert g.groupResourceProfileName is not None, f"Group {g} has no name"
+ r: Runtime
+ for r in av.get_available_runtimes(group=g.groupResourceProfileName):
+ if (node_count or 1) > int(r.args["node_count"]):
+ continue
+ if (cpu_count or 1) > int(r.args["cpu_count"]):
+ continue
+ if (cluster or r.args["cluster"]) != r.args["cluster"]:
+ continue
+ if (category or r.args["category"]) != r.args["category"]:
+ continue
+ matched_runtimes.append(r)
+ runtimes.extend(matched_runtimes)
+ return runtimes
+
+def is_terminal_state(x: States) -> bool:
+ return x in ["CANCELED", "COMPLETE", "COMPLETED", "FAILED"]
\ No newline at end of file
diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/task.py
b/dev-tools/airavata-python-sdk/airavata_experiments/task.py
index bcda796518..fea221b1cf 100644
--- a/dev-tools/airavata-python-sdk/airavata_experiments/task.py
+++ b/dev-tools/airavata-python-sdk/airavata_experiments/task.py
@@ -72,7 +72,13 @@ class Task(pydantic.BaseModel):
assert self.ref is not None
from pathlib import Path
Path(local_dir).mkdir(parents=True, exist_ok=True)
- return self.runtime.download(file, local_dir, self)
+ try:
+ saved_path = self.runtime.download(file, local_dir, self)
+ print(f"[Remote] Downloaded {file} -> {saved_path}")
+ return saved_path
+ except Exception as e:
+ print(f"[Remote] Failed to download file: {repr(e)}")
+ return ""
def download_all(self, local_dir: str) -> list[str]:
assert self.ref is not None
@@ -92,6 +98,10 @@ class Task(pydantic.BaseModel):
def cat(self, file: str) -> bytes:
assert self.ref is not None
return self.runtime.cat(file, self)
+
+ def exec(self, cmd: str) -> bytes:
+ assert self.ref is not None
+ return self.runtime.execute_cmd(cmd, self)
def stop(self) -> None:
assert self.ref is not None
diff --git a/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py
b/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py
index 1528304462..c445e654b2 100644
--- a/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py
+++ b/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py
@@ -43,6 +43,7 @@ from rich.live import Live
from jupyter_client.blocking.client import BlockingKernelClient
from airavata_auth.device_auth import AuthContext
+from airavata_experiments.plan import Plan
from airavata_sdk import Settings
# ========================================================================
@@ -62,6 +63,7 @@ class RequestedRuntime:
group: str
file: str | None
use: str | None
+ plan: str | None
class ProcessState(IntEnum):
@@ -101,6 +103,7 @@ RuntimeInfo = NamedTuple('RuntimeInfo', [
('envName', str),
('pids', list[int]),
('tunnels', dict[str, tuple[str, int]]),
+ ('links', dict[str, str]),
])
PENDING_STATES = [
@@ -139,23 +142,6 @@ class State:
# HELPER FUNCTIONS
-def get_access_token(envar_name: str = "CS_ACCESS_TOKEN", state_path: str =
"/tmp/av.json") -> str | None:
- """
- Get access token from environment or file
-
- @param None:
- @returns: access token if present, None otherwise
-
- """
- token = os.getenv(envar_name)
- if not token:
- try:
- token = json.load(Path(state_path).open("r")).get("access_token")
- except (FileNotFoundError, json.JSONDecodeError):
- pass
- return token
-
-
def is_runtime_ready(access_token: str, rt: RuntimeInfo, rt_name: str):
"""
Check if the runtime (i.e., agent job) is ready to receive requests
@@ -470,7 +456,8 @@ def submit_agent_job(
memory: int | None = None,
gpus: int | None = None,
gpu_memory: int | None = None,
- file: str | None = None,
+ spec_file: str | None = None,
+ plan_file: str | None = None,
) -> None:
"""
Submit an agent job to the given runtime
@@ -487,7 +474,8 @@ def submit_agent_job(
@param memory: the memory for cpu (MB)
@param gpus: the number of gpus (int)
@param gpu_memory: the memory for gpu (MB)
- @param file: environment file (path)
+ @param spec_file: environment file (path)
+ @param plan_file: experiment plan file (path)
@returns: None
"""
@@ -506,14 +494,14 @@ def submit_agent_job(
pip: list[str] = []
# if file is provided, validate it and use the given values as defaults
- if file is not None:
- fp = Path(file)
+ if spec_file is not None:
+ fp = Path(spec_file)
# validation
- assert fp.exists(), f"File {file} does not exist"
+ assert fp.exists(), f"File {spec_file} does not exist"
with open(fp, "r") as f:
- content = yaml.safe_load(f)
+ spec = yaml.safe_load(f)
# validation: /workspace
- assert (workspace := content.get("workspace", None)) is not None,
"missing section: /workspace"
+ assert (workspace := spec.get("workspace", None)) is not None,
"missing section: /workspace"
assert (resources := workspace.get("resources", None)) is not None,
"missing section: /workspace/resources"
assert (min_cpu := resources.get("min_cpu", None)) is not None,
"missing section: /workspace/resources/min_cpu"
assert (min_mem := resources.get("min_mem", None)) is not None,
"missing section: /workspace/resources/min_mem"
@@ -523,12 +511,29 @@ def submit_agent_job(
assert (datasets := workspace.get("data_collection", None)) is not
None, "missing section: /workspace/data_collection"
collection = models + datasets
# validation: /additional_dependencies
- assert (additional_dependencies :=
content.get("additional_dependencies", None)) is not None, "missing section:
/additional_dependencies"
+ assert (additional_dependencies := spec.get("additional_dependencies",
None)) is not None, "missing section: /additional_dependencies"
assert (modules := additional_dependencies.get("modules", None)) is
not None, "missing /additional_dependencies/modules section"
assert (conda := additional_dependencies.get("conda", None)) is not
None, "missing /additional_dependencies/conda section"
assert (pip := additional_dependencies.get("pip", None)) is not None,
"missing /additional_dependencies/pip section"
mounts = [f"{i['identifier']}:{i['mount_point']}" for i in collection]
+ # if plan file is provided, link its runs into the workspace
+ links = {}
+ if plan_file is not None:
+ assert Path(plan_file).exists(), f"Plan {plan_file} does not exist"
+ assert Path(plan_file).is_file(), f"Plan {plan_file} is not a file"
+ with open(Path(plan_file), "r") as f:
+ from airavata_experiments.base import Plan
+ plan = Plan(**json.load(f))
+ for task in plan.tasks:
+ plan_cluster = str(task.runtime.args.get("cluster", "N/A"))
+ if plan_cluster == "N/A":
+ print(f"[av] skipping plan task {task.name}: cluster not
specified in the plan")
+ continue
+ assert plan_cluster == cluster, f"[av] cluster mismatched:
{plan_cluster} in plan, {cluster} in request"
+ assert task.pid is not None, f"[av] plan task {task.name} has no
pid"
+ links[task.pid] = task.name
+
# payload
data = {
'experimentName': app_name,
@@ -554,6 +559,7 @@ def submit_agent_job(
print(f"* libraries={data['libraries']}", flush=True)
print(f"* pip={data['pip']}", flush=True)
print(f"* mounts={data['mounts']}", flush=True)
+ print(f"* links={links}", flush=True)
# Send the POST request
headers = generate_headers(access_token, gateway_id)
@@ -584,6 +590,7 @@ def submit_agent_job(
envName=obj['envName'],
pids=[],
tunnels={},
+ links=links,
)
state.all_runtimes[rt_name] = rt
print(f'Requested runtime={rt_name}', flush=True)
@@ -1114,7 +1121,7 @@ def request_runtime(line: str):
Request a runtime with given capabilities
"""
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
[rt_name, *cmd_args] = line.strip().split()
@@ -1151,12 +1158,32 @@ def request_runtime(line: str):
p.add_argument("--group", type=str, help="resource group", required=False,
default="Default")
p.add_argument("--file", type=str, help="yml file", required=False)
p.add_argument("--use", type=str, help="allowed resources", required=False)
+ p.add_argument("--plan", type=str, help="experiment plan file",
required=False)
args = p.parse_args(cmd_args, namespace=RequestedRuntime())
if args.file is not None:
- assert args.use is not None
- cluster, queue = meta_scheduler(args.use.split(","))
+ assert (args.use or args.plan) is not None
+ if args.use:
+ cluster, queue = meta_scheduler(args.use.split(","))
+ else:
+ assert args.plan is not None, "--plan is required when --use is not
provided"
+ assert os.path.exists(args.plan), f"--plan={args.plan} file does not
exist"
+ assert os.path.isfile(args.plan), f"--plan={args.plan} is not a file"
+ with open(args.plan, "r") as f:
+ plan: Plan = Plan(**json.load(f))
+ clusters = []
+ queues = []
+ for task in plan.tasks:
+ c, q = task.runtime.args.get("cluster"),
task.runtime.args.get("queue_name")
+ clusters.append(c)
+ queues.append(q)
+ assert len(set(clusters)) == 1, "all tasks must be on the same
cluster"
+ assert len(set(queues)) == 1, "all tasks must be on the same queue"
+ cluster, queue = clusters[0], queues[0]
+ assert cluster is not None, "cluster is required"
+ assert queue is not None, "queue is required"
+
submit_agent_job(
rt_name=rt_name,
access_token=access_token,
@@ -1166,7 +1193,8 @@ def request_runtime(line: str):
cluster=cluster,
queue=queue,
group=args.group,
- file=args.file,
+ spec_file=args.file,
+ plan_file=args.plan,
)
else:
assert args.cluster is not None
@@ -1194,7 +1222,7 @@ def stat_runtime(line: str):
Show the status of the runtime
"""
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
rt_name = line.strip()
@@ -1226,7 +1254,7 @@ def wait_for_runtime(line: str):
rt_name, render_live_logs = parts[0], True
else:
raise ValueError("Usage: %wait_for_runtime <rt> [--live]")
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
rt = state.all_runtimes.get(rt_name, None)
@@ -1238,8 +1266,14 @@ def wait_for_runtime(line: str):
random_port = random.randint(2000, 6000) * 5
launch_remote_kernel(rt_name, random_port, hostname="127.0.0.1")
print(f"Remote Jupyter kernel launched and connected for
runtime={rt_name}.")
- return
-
+
+ # create symlinks
+ for pid, name in rt.links.items():
+ try:
+ state.kernel_clients[rt_name].execute(f"!ln -s ../{pid} {name}",
silent=True, store_history=False)
+ print(f"[av] linked ../{pid} -> {name}")
+ except Exception as e:
+ print(f"[av] failed to link ../{pid} -> {name}: {e}")
@register_line_magic
def run_subprocess(line: str):
@@ -1247,7 +1281,7 @@ def run_subprocess(line: str):
Run a subprocess asynchronously
"""
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
rt_name = state.current_runtime
@@ -1279,7 +1313,7 @@ def kill_subprocess(line: str):
Kill a running subprocess asynchronously
"""
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
rt_name = state.current_runtime
@@ -1308,7 +1342,7 @@ def open_tunnels(line: str):
Open tunnels to the runtime
"""
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
rt_name = state.current_runtime
@@ -1348,7 +1382,7 @@ def close_tunnels(line: str):
Close tunnels to the runtime
"""
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
rt_name = state.current_runtime
@@ -1374,7 +1408,7 @@ def restart_runtime(rt_name: str):
Restart the runtime
"""
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
rt = state.all_runtimes.get(rt_name, None)
@@ -1389,7 +1423,7 @@ def stop_runtime(rt_name: str):
Stop the runtime
"""
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
rt = state.all_runtimes.get(rt_name, None)
@@ -1457,7 +1491,7 @@ def launch_remote_kernel(rt_name: str, base_port: int,
hostname: str):
Launch a remote Jupyter kernel, open tunnels, and connect a local Jupyter
client.
"""
assert ipython is not None
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
# launch kernel and tunnel ports
@@ -1535,7 +1569,7 @@ def open_web_terminal(line: str):
cmd = f"ttyd -p {random_port} -i 0.0.0.0 --writable bash"
# Get access token
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
if access_token is None:
print("Not authenticated. Please run %authenticate first.")
return
@@ -1667,7 +1701,7 @@ async def run_cell_async(
return await orig_run_code(raw_cell, store_history, silent,
shell_futures, transformed_cell=transformed_cell,
preprocessing_exc_tuple=preprocessing_exc_tuple, cell_id=cell_id)
else:
# Validation: check runtime is ready and kernel is started
- access_token = get_access_token()
+ access_token = AuthContext.get_access_token()
assert access_token is not None
rt_info = state.all_runtimes.get(rt, None)
if rt_info is None:
diff --git a/dev-tools/airavata-python-sdk/airavata_sdk/__init__.py
b/dev-tools/airavata-python-sdk/airavata_sdk/__init__.py
index 546c3bcd67..0a559918e8 100644
--- a/dev-tools/airavata-python-sdk/airavata_sdk/__init__.py
+++ b/dev-tools/airavata-python-sdk/airavata_sdk/__init__.py
@@ -164,7 +164,7 @@ class Settings:
@property
def STORAGE_RESOURCE_HOST(self):
- return str(os.getenv("STORAGE_RESOURCE_HOST", "cybershuttle.org"))
+ return str(os.getenv("STORAGE_RESOURCE_HOST",
"gateway.cybershuttle.org"))
@property
def SFTP_PORT(self):
diff --git a/dev-tools/airavata-python-sdk/pyproject.toml
b/dev-tools/airavata-python-sdk/pyproject.toml
index d6fa91b41e..d995930f1b 100644
--- a/dev-tools/airavata-python-sdk/pyproject.toml
+++ b/dev-tools/airavata-python-sdk/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "airavata-python-sdk"
-version = "2.1.7"
+version = "2.2.3"
description = "Apache Airavata Python SDK"
readme = "README.md"
license = "Apache-2.0"
diff --git a/modules/agent-framework/airavata-agent/application/README.md
b/modules/agent-framework/airavata-agent/application/README.md
new file mode 100644
index 0000000000..f57bb9b204
--- /dev/null
+++ b/modules/agent-framework/airavata-agent/application/README.md
@@ -0,0 +1,19 @@
+# Applications Scripts - Agent
+
+## NAMD
+
+### Expanse
+```sh
+NAMD_CPU_MODULES="cpu/0.17.3b gcc/10.2.0 openmpi/4.1.1"
+NAMD_CPU_PATH=/home/scigap/applications/NAMD_3.1alpha2_Linux-x86_64-multicore
+NAMD_GPU_MODULES="gpu/0.17.3b"
+NAMD_GPU_PATH=/home/scigap/applications/NAMD_3.0.1_Linux-x86_64-multicore-CUDA
+```
+
+### Delta
+```sh
+NAMD_CPU_MODULES="openmpi/4.1.6 fftw/3.3.10"
+NAMD_CPU_PATH=/sw/namd/NAMD_3.0b3_Linux-x86_64-multicore
+NAMD_GPU_MODULES="namd3/2024.02.mulitcore_cuda.s11"
+NAMD_GPU_PATH=/sw/namd/NAMD_3.0b3_Linux-x86_64-multicore-CUDA
+```
diff --git a/modules/agent-framework/airavata-agent/application/alphafold2.sh
b/modules/agent-framework/airavata-agent/application/alphafold2.sh
new file mode 100644
index 0000000000..2d1318ffe0
--- /dev/null
+++ b/modules/agent-framework/airavata-agent/application/alphafold2.sh
@@ -0,0 +1,91 @@
+#!/bin/bash -x
+set -euo pipefail
+
+while getopts t:p:m: option
+ do
+ case $option in
+ t ) MaxDate=$OPTARG ;;
+ p ) MODEL_PRESET=$OPTARG ;;
+ m ) Num_Multi=$OPTARG ;;
+ \? ) cat << ENDCAT1
+>! Usage: $0 [-t Maximum Template Date ] !<
+>! [-p Model Preset ] !<
+>! [-m Number of Multimers per Model ] !<
+ENDCAT1
+# exit 1 ;;
+ esac
+done
+
+if [ $Num_Multi = "" ]; then
+ export Num_Multi=1
+fi
+#set the environment PATH
+export PYTHONNOUSERSITE=True
+module reset
+module load singularitypro
+ALPHAFOLD_HOME=/expanse/projects/qstore/data/alphafold-v2.3.2
+ALPHAFOLD_MODELS=$ALPHAFOLD_HOME/params
+
+pdb70=""
+uniprot=""
+pdbseqres=""
+nummulti=""
+
+# check_flags
+ if [ "monomer" = "${MODEL_PRESET%_*}" ];then
+ export pdb70="--pdb70_database_path=/data/pdb70/pdb70"
+ else
+ export uniprot="--uniprot_database_path=/data/uniprot/uniprot.fasta"
+ export
pdbseqres="--pdb_seqres_database_path=/data/pdb_seqres/pdb_seqres.txt"
+ export nummulti="--num_multimer_predictions_per_model=$Num_Multi"
+ fi
+
+## Copy input to node local scratch
+cp input.fasta /scratch/$USER/job_$SLURM_JOBID
+#cp -r /expanse/projects/qstore/data/alphafold/uniclust30/uniclust30_2018_08
/scratch/$USER/job_$SLURM_JOBID/
+cd /scratch/$USER/job_$SLURM_JOBID
+ln -s /expanse/projects/qstore/data/alphafold/uniclust30/uniclust30_2018_08
+mkdir bfd
+cp /expanse/projects/qstore/data/alphafold/bfd/*index bfd/
+#cp
/expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_hhm.ffdata
bfd/
+#cp
/expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_cs219.ffdata
bfd/
+cd bfd
+ln -s
/expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_hhm.ffdata
+ln -s
/expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_cs219.ffdata
+ln -s
/expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_a3m.ffdata
+cd ../
+mkdir alphafold_output
+# Create soft links ro rundir form submitdir
+
+ln -s /scratch/$USER/job_$SLURM_JOBID $SLURM_SUBMIT_DIR/rundir
+
+#Run the command
+singularity run --nv \
+ -B /expanse/lustre \
+ -B /expanse/projects \
+ -B /scratch \
+ -B $ALPHAFOLD_HOME:/data \
+ -B $ALPHAFOLD_MODELS \
+ /cm/shared/apps/containers/singularity/alphafold/alphafold_aria2_v2.3.2.simg \
+ --fasta_paths=/scratch/$USER/job_$SLURM_JOBID/input.fasta \
+ --uniref90_database_path=/data/uniref90/uniref90.fasta \
+ --data_dir=/data \
+ --mgnify_database_path=/data/mgnify/mgy_clusters_2022_05.fa \
+
--bfd_database_path=/scratch/$USER/job_$SLURM_JOBID/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt
\
+ --uniref30_database_path=/data/uniref30/UniRef30_2021_03 \
+ $pdbseqres \
+ $pdb70 \
+ $uniprot \
+ --template_mmcif_dir=/data/pdb_mmcif/mmcif_files \
+ --obsolete_pdbs_path=/data/pdb_mmcif/obsolete.dat \
+ --output_dir=/scratch/$USER/job_$SLURM_JOBID/alphafold_output \
+ --max_template_date=$MaxDate \
+ --model_preset=$MODEL_PRESET \
+ --use_gpu_relax=true \
+ --models_to_relax=best \
+ $nummulti
+
+unlink $SLURM_SUBMIT_DIR/rundir
+
+### Copy back results
+tar -cvf $SLURM_SUBMIT_DIR/alphafold_output.tar alphafold_output
diff --git a/modules/agent-framework/airavata-agent/application/gaussian16.sh
b/modules/agent-framework/airavata-agent/application/gaussian16.sh
new file mode 100644
index 0000000000..f4089d116a
--- /dev/null
+++ b/modules/agent-framework/airavata-agent/application/gaussian16.sh
@@ -0,0 +1,256 @@
+#!/bin/sh -x
+# $Id: run_g09_chk_recovery.sh,v 1.0 2017/04/22 14:15:00 Sudhakar Exp $
+
+if [ $# -lt 1 -o $# -gt 3 ]; then
+ echo 1>&2 "Usage: $0 gaussian_input_gjf [SEAGrid_UserName] [GPU] "
+ #echo 1>&2 "Usage: $0 subdir gaussian_input_gjf clobber [gcvars]"
+ exit 127
+fi
+
+# subdir depends on whether we're doing freq, water or PES. For freq and water,
+# it should be hardcoded in the Xbaya workflow. For PES, it should be an
+# additional array generated by the frontend. The contents of this array are
+# trivial, but creating an extra Xbaya service to generate it would add
+# unnecessary extra complexity. Besides, the frontend cannot avoid having to
+# pass at least one array: the array with gjf files.
+#subdir=$1
+subdir="$PWD"
+#export GAUSS_SCRDIR=/oasis/scratch/comet/$USER/temp_project/$SLURM_JOBID
+#export GAUSS_SCRDIR=/expanse/lustre/scratch/$USER/temp_project/$SLURM_JOBID
+scratch_subid=$(id -u $user | tail -c2)
+scrdir="/storage/scratch1/$scratch_subid/$USER"
+export GAUSS_SCRDIR=$scrdir/$SLURM_JOBID
+mkdir -p $GAUSS_SCRDIR
+gaussian_input_full=$1
+if [ $AIRAVATA_USERNAME ]; then
+ echo " The Airavata Gateway User is $AIRAVATA_USERNAME"
+ SG_UserName="$AIRAVATA_USERNAME"
+#elif [ $2 ]; then
+# SG_UserName=$2
+else
+ echo " The Airavata Gateway User is missing "
+exit
+fi
+
+ #export
PATH="/storage/home/hcoda1/7/spamidig6/ideas_storage/apps/g16:$PATH"
+ #export
GAUSS_EXEDIR="/storage/home/hcoda1/7/spamidig6/ideas_storage/apps/g16"
+ #. ~/.bash_profile
+#if [ "$2" = "GPU" ]; then
+ echo "Using GPU version of Gaussian 16"
+ #module reset; module load gpu/0.15.4 gaussian/16.C.01-cuda
+ #module load gaussian/16.C.02
+ #export
PATH="/storage/home/hcoda1/7/spamidig6/ideas_storage/apps/g16:$PATH"
+ #export
GAUSS_EXEDIR="/storage/home/hcoda1/7/spamidig6/ideas_storage/apps/g16"
+ #. ~/.bash_profile
+#fi
+
+if [ $AIRAVATA_ExptDataDir ]; then
+ echo "The Airavata Storage Directory for this job is $AIRAVATA_ExptDataDir"
+ echo "Preparing Cron to push log data to storage periodically"
+ # Get Slurm total time
+# flds=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' | awk -F:
'{print NF}'`
+# flds=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' | awk -F-
'{print NF}'` + $flds
+# # if flds 3 $1 is hrs $2 is min and $3 is sec
+# if [ $flds = 4 ]; then
+# jdys=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' |awk -F-
'{print $1}'`
+# hrmnse=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' |awk -F-
'{print $2}'`
+# jhrs=`echo $hrmnse | awk -F: '{print $1}'`+24*$jdys
+# jmin=`echo $hrmnse | awk -F: '{print $2}'`+60*$jhrs
+# elif [ $flds = 3 ]; then
+# jhrs=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' |awk -F:
'{print $1}'`
+# jmin=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' |awk -F:
'{print $2}'`+60*$jhrs
+# elif [ $flds = 2 ]; then
+# jmin=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' |awk -F:
'{print $1}'`
+# fi
+# if [ $jhrs .gt. 5 ]; then
+# upd=30
+# else
+# upd=$jmin/10
+# fi
+ # For 5hrs and above uperiod is 30 min and for less than 5 hrs it is 10%
of total time
+ # compute uperiod
+ # preapre or use prepared updateatorage script
+ # i(crontab -l 2>/dev/null; echo "*/$uperiod * * * * /path/to/job -with
args") | crontab - */30 * * * * /path/to/command
+ # Use crontab -r to remove it after the job is finished or ended ( under
trap)
+# mycron=jcron$$
+# #write out current crontab
+# crontab -l > $mycron
+# #echo new cron into cron file
+# #echo "*/$upd * * * * /home/gridchem/bin/joblogscp.sh >/dev/null 2>&1"
>> $mycron
+# echo "*/$upd * * * * scp *.log
[email protected]:$AIRAVATA_ExptDataDir >/dev/null 2>&1" >> $mycron
+# #echo "* * * * * sleep 20; /Users/spamidig1/bin/testscp.sh >/dev/null
2>&1" >> $mycron
+##install new cron file
+# crontab $mycron
+# crontab -l
+# # delete this entry at the end of the job or trap...
+fi
+inputs=`echo $AIRAVATA_INPUTS | sed 's/[//'`
+inputs=`echo $inputs | sed 's/]//'`
+echo "Airavata Inputs: $inputs"
+cd $subdir
+dos2unix -k $gaussian_input_full
+gaussian_input=${gaussian_input_full##*/}
+gaussian_output=${gaussian_input%.*}.log
+clobber="$3" # set to "" for rerun or debug; otherwise set to 1
+gcvars=$4
+null=""
+
+# Next line will ususally return "cp: `GCVARS' and `GCVARS' are the same file"
+#if [ "$gcvars" ] ; then cp -p $gcvars GCVARS 2>/dev/null ; fi
+
+#if [ ! "$LOCAL_LOCATION" ] ; then
+# if [ -s ~/.paramchemlocation ] ; then
+# read LOCAL_LOCATION < ~/.paramchemlocation
+# . $LOCAL_LOCATION/environment
+# fi
+# fi
+#if [ ! -d "$LOCAL_LOCATION" ] ; then
+# echo "Warning: no valid LOCAL_LOCATION found" >&2
+#gauss_mem=56GB
+#gauss_nproc=24
+## Escaping spaces rather than quoting because quoting prevents tilde expansion
+##charmm_location=~gridchem/workflow_script/sys_exec/local/c36a6-132-serial\
-chsize\ 25140
+#SCRIPT_LOCATION=~gridchem/workflow_script/sys_exec/tools
+#LOCAL_LOCATION=~gridchem/workflow_script/sys_exec/local-comet
+#export CONPATH=$SCRIPT_LOCATION
+localarc="$HOME/scratch"
+##globalarc="[email protected]:/home/ccguser/mss/internal/$SGUserName"
+## exit 1
+# fi
+#. $LOCAL_LOCATION/environment
+
+#read GC_UserName GC_ProjectName GC_WorkflowName TIMESTAMP < GCVARS
+usrchkdir=$localarc/${SG_UserName}/
+echo " The Airavata Gateway User Directory is $usrchkdir"
+copysubdir="./"
+mkdir -p $usrchkdir
+#copysubdir=$localarc/${GC_UserName}/${GC_ProjectName}/${GC_WorkflowName}/$subdir
+# The way a "false" boolean variable is passed seems to be unstable; it's
+# been "", "0" and "false", so we try to cover all reasonable possibilities.
+if [ ! "$clobber" ] ; then
+ clobber=0
+ fi
+if [ "$clobber" = "0" -o "$clobber" = "false" -o "$clobber" = "no" ] ; then
+ if [ -s $copysubdir/$gaussian_output ] ; then
+ echo gaussian_output_log=$copysubdir/$gaussian_output
+ exit 0
+ fi
+ gaussian_output_full=${gaussian_input_full%.*}.log
+ if [ -s $gaussian_output_full ] ; then
+ #mkdir -p $copysubdir
+ rm -f $copysubdir/$gaussian_output # clean up symlink if something went
wrong earlier
+ if [ $gaussian_input_full -nt $copysubdir/$gaussian_input ] ; then sed
's/\r$//' $gaussian_input_full > $copysubdir/$gaussian_input ; fi
+ cp -up $gaussian_output_full $copysubdir
+ echo gaussian_output_log=$gaussian_output_full
+ exit 0
+ fi
+ fi
+
+#Process inputfile for run files and other job requirements
+ # PROCESS CHECKPOINT FILE
+ # Check to see if the checkpoint file is given a name in input deck
+ # Input file to look into
+ dummy="$gaussian_input_full"
+ #dummy="$Diskinputdir/$Diskinputfile"
+ checkpt="no"
+ junk=`/usr/bin/head -5 $dummy | /bin/grep -i "%chk"`
+ if [ "$junk" != "" ]; then
+ junk=`echo $junk | /bin/sed 's/=/@/'`
+ junk=`echo $junk | /bin/sed 's/ //'`
+ #
+ # Be careful: Don't lose the subdirectory information for CHKPT file
+ # Also, add .chk if there is no extension to the Checkpoint file
+ #
+ Chkfile=`expr $junk : '.*@\(.*\)'`
+ Chkdir="$Diskinputdir"
+ Chkfile=`/bin/basename $Chkfile`
+ dummy=`expr $Chkfile : '.*\(\..*\)'`
+ Chkfile=`/bin/basename $Chkfile $dummy`
+ ChkfileWNE="$Chkfile"
+ Chkfile="$Chkfile${dummy:-.chk}"
+//"`Chkfile=`echo $Chkfile | sed "s/
+ # 2005/12/08 create name for $formated_chkfile
+ formated_chkfile="$ChkfileWNE.fchk"
+ Chkfile_with_arch="${Chkfile}_$architecture"
+ echo "DEBUG: checkfile = $Chkfile and formated_chkfile = $formated_chkfile
";
+ checkpt="yes"
+#Retrieve the checkpoint file from the user archive directory
+ if [ -f "$usrchkdir/$Chkfile" ]; then
+ cp $usrchkdir/$Chkfile .
+ fi
+//"` export PJobID=`grep -i localjobid $gaussian_input_full | awk -F=
'{print $2}' | sed "s/
+# /bin/cat >> $qsubin << HERE2
+ #export PJobID=`grep -i localjobid $Diskinputfile | awk -F= '{print $2}' |
sed "s/^M//"`
+ #cd /oasis/scratch/comet/gridchem/temp_project/$PJobID
+ #cd \$SCRATCH_BATCH
+ ##if [ ${PJobID:-null} != "$null" ]; then
+ if [ "${PJobID}" != "" ]; then
+ #cp -r /work/ccguser/batch_scratch/$PJobID*/* .
+ #cp -r /oasis/scratch/comet/gridchem/temp_project/$PJobID*/* .
+ cp -r $HOME/scratch/$PJobID*/* .
+ ls -l
+ fi
+ else
+ echo "******** NO CHECKPOINT FILE IDENTIFIED ******** "
+ fi
+
+mkdir -p $copysubdir
+mkdir -p $subdir
+cd $subdir
+cwd=`pwd`
+if [ $gaussian_input_full -nt $copysubdir/$gaussian_input ] ; then sed
's/\r$//' $gaussian_input_full > $copysubdir/$gaussian_input ; fi
+cd $copysubdir
+rm -f $gaussian_output
+if [ "$cwd" != "$subdir" ]; then
+ ln -s $cwd/$gaussian_output $subdir/$gaussian_output
+fi
+cd $cwd
+if [ $gaussian_input_full -nt $gaussian_input ] ; then sed 's/\r$//'
$gaussian_input_full > $gaussian_input ; fi
+signals_to_trap="XCPU INT TERM CHLD"
+#trap "| grep -v $AIRAVATA_ExptDataDir | crontab -; rm -rf $mycron; cp -p
$gaussian_output $copysubdir; cp -p $Chkfile $copysubdir; exit 99"
$signals_to_trap
+#trap "crontab -l | grep -v $AIRAVATA_ExptDataDir | crontab -; rm -rf $mycron;
cp -p $gaussian_output $copysubdir; cp -p $Chkfile $copysubdir; exit 99"
$signals_to_trap
+cd $HOME/scratch
+ln -s $subdir $SLURM_JOBID
+ls -l $SLURM_JOBID/
+cd $cwd
+#$LOCAL_LOCATION/run_gaussian_local.sh $gaussian_input $gaussian_output
+which g16
+g16 $gaussian_input $gaussian_output
+#BEGIN{while(getline < infile) if ($0 ~ "^ *--[lL][iI][nN][kK]1--") nlink++}
+
+#if awk -v infile=$gaussian_input '
+# BEGIN{while(getline < infile) if ($0 ~ "^ *[lL][iI][nN][kK]1") nlink++}
+# /^ *Normal termination of Gaussian/{nnormal++}
+# END{if (nnormal == nlink+1) exit 1}' $gaussian_output ; then
+# echo "Gaussian terminated abnormally." >&2
+# exit 1
+#fi
+# Remove the cron entry to periodically stage the data to storage
+#crontab -l | grep -v "$AIRAVATA_ExptDataDir"" | crontab -
+#crontab -l
+#rm $mycron
+
+#rm $copysubdir/$gaussian_output
+cp -p $gaussian_output $copysubdir
+ if [ -f "$Chkfile" ]; then
+ cp -p $Chkfile $copysubdir
+ fi
+ if [ -f "$GAUSS_SCRDIR/$Chkfile" ]; then
+ cp -p $GAUSS_SCRDIR/$Chkfile .
+ fi
+ # Save checkpoint file to usrchkdir
+ #mkdir -p $usrchkdir
+ if [ -f "$Chkfile" ]; then
+ formchk $Chkfile
+ cp -f $Chkfile $usrchkdir
+ cp -f *.fchk $usrchkdir
+ cp -f *.fchk $copysubdir
+ fi
+#remove rwf files
+rm *.rwf*
+cd $HOME/scratch
+#ln -s $subdir $PBS_JOBID
+ls -l $SLURM_JOBID
+rm $SLURM_JOBID/*.rwf*
+echo gaussian_output_log=$cwd/$gaussian_output
+cat: S: No such file or directory
diff --git a/modules/agent-framework/airavata-agent/application/gromacs.sh
b/modules/agent-framework/airavata-agent/application/gromacs.sh
new file mode 100644
index 0000000000..dacb52f24e
--- /dev/null
+++ b/modules/agent-framework/airavata-agent/application/gromacs.sh
@@ -0,0 +1,165 @@
+#!/bin/sh -x
+# $Id: run_Gromacs_data_recovery.sh,v 1.0 2017/11/23 12:15:00 Sudhakar Exp $
+
+if [ $# -lt 1 -o $# -gt 11 ]; then
+ echo 1>&2 "Usage: $0 -c coord_file -s tpr_file -g log_file -e ener_file
[SEAGrid_UserName] [Gromacs_restart_input] "
+ #echo 1>&2 "Usage: $0 subdir Gromacs_restart_input"
+ exit 127
+fi
+
+# subdir depends on whether we're doing freq, water or PES. For freq and water,
+# it should be hardcoded in the Xbaya workflow. For PES, it should be an
+# additional array generated by the frontend. The contents of this array are
+# trivial, but creating an extra Xbaya service to generate it would add
+# unnecessary extra complexity. Besides, the frontend cannot avoid having to
+# pass at least one array: the array with gjf files.
+#subdir=$1
+subdir="$PWD"
+#Gromacs_res_input_full=$1
+Coord_file=$2
+Tpr_file=$4
+Log_file=$6
+Ener_file=$8
+Rest_file=$10
+if [ $AIRAVATA_USERNAME ]; then
+ echo " The Airavata Gateway User is $AIRAVATA_USERNAME"
+ SG_UserName="$AIRAVATA_USERNAME"
+elif [ $9 ]; then
+ SG_UserName=$9
+else
+ echo " The Airavata Gateway User is missing "
+exit
+fi
+dos2unix -k $Gromacs_res_input_full
+Gromacs_input=${Gromacs_res_input_full##*/}
+Gromacs_output=${Gromacs_input%.*}.log
+#clobber="$3" # set to "" for rerun or debug; otherwise set to 1
+clobber="1" # set to "" for rerun or debug; otherwise set to 1
+#gcvars=$4
+gcvars="GCVARS"
+null=""
+localarc="$HOME/scratch"
+# Next line will ususally return "cp: `GCVARS' and `GCVARS' are the same file"
+if [ "$gcvars" ] ; then cp -p $gcvars GCVARS 2>/dev/null ; fi
+##
+#if [ ! "$LOCAL_LOCATION" ] ; then
+# if [ -s ~/.paramchemlocation ] ; then
+# read LOCAL_LOCATION < ~/.paramchemlocation
+# fi
+# fi
+#if [ ! -d "$LOCAL_LOCATION" ] ; then
+# echo "Error: no valid LOCAL_LOCATION found" >&2
+# exit 1
+# fi
+#. $LOCAL_LOCATION/environment
+
+#read GC_UserName GC_ProjectName GC_WorkflowName TIMESTAMP < GCVARS
+usrchkdir=$localarc/${SG_UserName}/
+echo " The Airavata Gateway User Directory is $usrchkdir"
+copysubdir="./"
+#copysubdir=$localarc/${GC_UserName}/${GC_ProjectName}/${GC_WorkflowName}/$subdir
+# The way a "false" boolean variable is passed seems to be unstable; it's
+# been "", "0" and "false", so we try to cover all reasonable possibilities.
+if [ ! "$clobber" ] ; then
+ clobber=0
+ fi
+if [ "$clobber" = "0" -o "$clobber" = "false" -o "$clobber" = "no" ] ; then
+ if [ -s $copysubdir/$Gromacs_output ] ; then
+ echo Gromacs_output_log=$copysubdir/$Gromacs_output
+ exit 0
+ fi
+ Gromacs_output_full=${Gromacs_input_full%.*}.log
+ if [ -s $Gromacs_output_full ] ; then
+ mkdir -p $copysubdir
+ rm -f $copysubdir/$Gromacs_output # clean up symlink if something went
wrong earlier
+ if [ $Gromacs_input_full -nt $copysubdir/$Gromacs_input ] ; then sed
's/\r$//' $Gromacs_input_full > $copysubdir/$Gromacs_input ; fi
+ cp -up $Gromacs_output_full $copysubdir
+ echo Gromacs_output_log=$Gromacs_output_full
+ exit 0
+ fi
+ fi
+
+#Process inputfile for run files and other job requirements
+//"`export PJobID=`grep -i localjobid restart.txt| awk -F= '{print $2}' | sed
"s/
+ if [ ${PJobID:-null} != "$null" ]; then
+ cp -r /home/scigap/scratch/${SG_UserName}/$PJobID*/* .
+ fi
+# # PROCESS CHECKPOINT FILE
+# # Check to see if the checkpoint file is given a name in input deck
+# # Input file to look into
+# dummy="$Gromacs_input_full"
+# #dummy="$Diskinputdir/$Diskinputfile"
+# checkpt="no"
+# junk=`/usr/bin/head -5 $dummy | /bin/grep -i "%chk"`
+# if [ "$junk" != "" ]; then
+# junk=`echo $junk | /bin/sed 's/=/@/'`
+# junk=`echo $junk | /bin/sed 's/ //'`
+# #
+# # Be careful: Don't lose the subdirectory information for CHKPT file
+# # Also, add .chk if there is no extension to the Checkpoint file
+# #
+# Chkfile=`expr $junk : '.*@\(.*\)'`
+# Chkdir="$Diskinputdir"
+# Chkfile=`/bin/basename $Chkfile`
+# dummy=`expr $Chkfile : '.*\(\..*\)'`
+# Chkfile=`/bin/basename $Chkfile $dummy`
+# ChkfileWNE="$Chkfile"
+# Chkfile="$Chkfile${dummy:-.chk}"
+//"` Chkfile=`echo $Chkfile | sed "s/
+# # 2005/12/08 create name for $formated_chkfile
+# formated_chkfile="$ChkfileWNE.fchk"
+# Chkfile_with_arch="${Chkfile}_$architecture"
+# echo "DEBUG: checkfile = $Chkfile and formated_chkfile =
$formated_chkfile ";
+# checkpt="yes"
+##Retrieve the checkpoint file from the user archive directory
+# if [ -f "$usrchkdir/$Chkfile" ]; then
+# cp $usrchkdir/$Chkfile .
+# fi
+# else
+# echo "******** NO CHECKPOINT FILE IDENTIFIED ******** "
+# fi
+
+mkdir -p $copysubdir
+mkdir -p $subdir
+cd $subdir
+cwd=`pwd`
+if [ $Gromacs_input_full -nt $copysubdir/$Gromacs_input ] ; then sed 's/\r$//'
$Gromacs_input_full > $copysubdir/$Gromacs_input ; fi
+cd $copysubdir
+rm -f $Gromacs_output
+if [ "$cwd" != "$subdir" ]; then
+ ln -s $cwd/$Gromacs_output $subdir/$Gromacs_output
+fi
+cd $cwd
+if [ $Gromacs_input_full -nt $Gromacs_input ] ; then sed 's/\r$//'
$Gromacs_input_full > $Gromacs_input ; fi
+module unload intel; module load gromacs
+if [ ${PJobID:-null} != "$null" ]; then
+ cp -r /home/scigap/scratch/${SG_UserName}/$PJobID*/* .
+ #mpiexec -genv I_MPI_FABRICS shm:ofa gmx_mpi mdrun -s $Tpr_file -cpi
state.cpt
+ mpirun -np $SLURM_NTASKS -genv I_MPI_FABRICS shm:ofa gmx_mpi mdrun -v
-deffnm em -s $Tpr_file -cpi state.cpt
+else
+ mpirun -np $SLURM_NTASKS -genv I_MPI_FABRICS shm:ofa gmx_mpi mdrun -v
-deffnm em -s $Tpr_file -c $Coord_file -g $Log_file -e $Ener_file
+fi
+
+mpiexec gmx_mpi mdrun -s $Tpr_file -cpi state.cpt
+##$LOCAL_LOCATION/run_Gromacs_local.sh $Gromacs_input $Gromacs_output
+##$LOCAL_LOCATION/run_Gromacs_local.sh $Gromacs_input $Gromacs_output
+#BEGIN{while(getline < infile) if ($0 ~ "^ *--[lL][iI][nN][kK]1--") nlink++}
+
+#if awk -v infile=$Gromacs_input '
+# BEGIN{while(getline < infile) if ($0 ~ "^ *[lL][iI][nN][kK]1") nlink++}
+# /^ *Normal termination of Gaussian/{nnormal++}
+# END{if (nnormal == nlink+1) exit 1}' $Gromacs_output ; then
+# echo "Gaussian terminated abnormally." >&2
+# exit 1
+#fi
+#rm $copysubdir/$Gromacs_output
+cp -p $Gromacs_output $copysubdir
+ # Save checkpoint file to usrchkdir
+# mkdir -p $usrchkdir
+# if [ -f "$Chkfile" ]; then
+# cp -f $Chkfile $usrchkdir
+# fi
+# # Create a link directory for this job with jobID in user scratch
+ ln -s $PWD ~/scratch/${SG_USERNAME}/$PBS_JOBID
+echo Gromacs_output_log=$cwd/$Gromacs_output
+#rm *.wfc*
diff --git a/modules/agent-framework/airavata-agent/application/namd.sh
b/modules/agent-framework/airavata-agent/application/namd.sh
new file mode 100644
index 0000000000..240cc2ad10
--- /dev/null
+++ b/modules/agent-framework/airavata-agent/application/namd.sh
@@ -0,0 +1,130 @@
+#!/bin/bash -x
+set -euo pipefail
+
+# ----------------------------------------------------------------------
+# SETUP
+# ----------------------------------------------------------------------
+export PATH=$PWD:$PATH
+export WORKDIR=$PWD
+export CS_HOME=$HOME/cybershuttle
+export MAMBA_ROOT_PREFIX=$CS_HOME/scratch
+export TMPDIR=$CS_HOME/scratch/tmp
+
+# initialize scratch/tmp and scratch/envs (node-local)
+CS_TEMP=$(readlink $CS_HOME/scratch/tmp)
+CS_ENVS=$(readlink $CS_HOME/scratch/envs)
+[ -n "$CS_TEMP" ] && mkdir -p $CS_TEMP
+[ -n "$CS_ENVS" ] && mkdir -p $CS_ENVS
+NAMD_EXTRA_ARGS=()
+FIFO=$(mktemp -u)
+mkfifo $FIFO
+
+# ----------------------------------------------------------------------
+# PARSE COMMAND LINE ARGUMENTS
+# ----------------------------------------------------------------------
+
+required_vars=("NAMD_CPU_PATH" "NAMD_GPU_PATH" "NAMD_CPU_MODULES"
"NAMD_GPU_MODULES")
+for var in "${required_vars[@]}"; do
+ if [ -z "${!var}" ]; then
+ echo "$var is not set"
+ exit 2
+ fi
+done
+
+while getopts t:n:i:a:s: option; do
+ case $option in
+ t)
+ if [[ "$OPTARG" != "CPU" && "$OPTARG" != "GPU" ]]; then
+ echo "invalid argument -t $OPTARG: must be CPU|GPU."
+ exit 2
+ fi
+ EXECUTION_TYPE=$OPTARG
+ echo "EXECUTION_TYPE=$EXECUTION_TYPE"
+ module reset
+ if [ $EXECUTION_TYPE = "CPU" ]; then
+ # one replica at a time
+ echo 0 > $FIFO &
+ NAMD_PATH=$NAMD_CPU_PATH
+ module load $NAMD_CPU_MODULES
+ elif [ $EXECUTION_TYPE = "GPU" ]; then
+ # one replica per GPU
+ for ((i=0; i<${SLURM_GPUS_ON_NODE:-0}; i++)); do echo "$i" > $FIFO &
done
+ NAMD_PATH=$NAMD_GPU_PATH
+ NAMD_EXTRA_ARGS+=("--CUDASOAintegrate" "on")
+ module load $NAMD_GPU_MODULES
+ fi
+ module list
+ ;;
+ n)
+ NUM_REPLICAS=$OPTARG
+ echo "NUM_REPLICAS=$NUM_REPLICAS"
+ ;;
+ i)
+ NAMD_INPUT_FILES=$(find $WORKDIR -maxdepth 1 -type f ! -name "*slurm*" !
-name "*.stdout" ! -name "*.stderr")
+ NAMD_CONF_FILE=$OPTARG
+ echo "NAMD_INPUT_FILES=$NAMD_INPUT_FILES"
+ echo "NAMD_CONF_FILE=$NAMD_CONF_FILE"
+ ;;
+ a)
+ AGENT_ID=$OPTARG
+ echo "AGENT_ID=$AGENT_ID"
+ ;;
+ s)
+ AGENT_SERVER=$OPTARG
+ echo "AGENT_SERVER=$AGENT_SERVER"
+ ;;
+ \?)
+ echo 1>&2 "Usage: $0"
+ echo 1>&2 " -t [CPU|GPU]"
+ echo 1>&2 " -n [NUM_REPLICAS]"
+ echo 1>&2 " -i [NAMD_CONF_FILE]"
+ echo 1>&2 " -a [AGENT_ID]"
+ echo 1>&2 " -s [AGENT_SERVER]"
+ exit 2
+ ;;
+ esac
+done
+shift $((OPTIND - 1))
+
+# ----------------------------------------------------------------------
+# RUN AGENT
+# ----------------------------------------------------------------------
+
+wget -q
https://github.com/cyber-shuttle/binaries/releases/download/1.0.1/airavata-agent-linux-amd64
-O $WORKDIR/airavata-agent
+wget -q
https://github.com/cyber-shuttle/binaries/releases/download/1.0.1/kernel.py -O
$WORKDIR/kernel.py
+wget -q
https://github.com/mamba-org/micromamba-releases/releases/download/2.3.0-1/micromamba-linux-64
-O $WORKDIR/micromamba
+chmod +x $WORKDIR/airavata-agent $WORKDIR/micromamba
+$WORKDIR/airavata-agent --server "$AGENT_SERVER:19900" --agent "$AGENT_ID"
--environ "$AGENT_ID" --lib "" --pip "" &
+AGENT_PID=$!
+trap 'kill -TERM $AGENT_PID' EXIT
+echo "Agent started with PID $AGENT_PID"
+
+
+# ----------------------------------------------------------------------
+# RUN NAMD3
+# ----------------------------------------------------------------------
+PIDS=()
+for REPLICA_ID in $(seq 1 $NUM_REPLICAS); do
+ (
+ read TOKEN <$FIFO
+
+ REPLICA_DIR=$WORKDIR/$REPLICA_ID
+ mkdir $REPLICA_DIR
+ cp $NAMD_INPUT_FILES $REPLICA_DIR/
+
+ [[ $EXECUTION_TYPE == "GPU" ]] && export CUDA_VISIBLE_DEVICES=$TOKEN
+ $NAMD_PATH/namd3 +setcpuaffinity +p $SLURM_CPUS_ON_NODE --cwd $REPLICA_DIR
"${NAMD_EXTRA_ARGS[@]}" \
+ $REPLICA_DIR/$NAMD_CONF_FILE >$REPLICA_DIR/$NAMD_CONF_FILE.out
2>$REPLICA_DIR/$NAMD_CONF_FILE.err
+ [[ $EXECUTION_TYPE == "GPU" ]] && unset CUDA_VISIBLE_DEVICES
+
+ echo $TOKEN > $FIFO &
+
+ for FILE in $(ls $REPLICA_DIR/*.*); do
+ mv $FILE $REPLICA_ID"_"$(basename $FILE)
+ done
+ rm -rf $REPLICA_DIR/
+
+ ) &
+ PIDS+=($!)
+done
+wait "${PIDS[@]}"
diff --git a/modules/agent-framework/airavata-agent/application/pmemd_cuda.sh
b/modules/agent-framework/airavata-agent/application/pmemd_cuda.sh
new file mode 100644
index 0000000000..59378da8bb
--- /dev/null
+++ b/modules/agent-framework/airavata-agent/application/pmemd_cuda.sh
@@ -0,0 +1,4 @@
+#!/bin/bash -x
+set -euo pipefail
+
+srun pmemd.cuda "$@"
diff --git a/modules/agent-framework/airavata-agent/application/pmemd_mpi.sh
b/modules/agent-framework/airavata-agent/application/pmemd_mpi.sh
new file mode 100644
index 0000000000..b00bd75162
--- /dev/null
+++ b/modules/agent-framework/airavata-agent/application/pmemd_mpi.sh
@@ -0,0 +1,4 @@
+#!/bin/bash -x
+set -euo pipefail
+
+srun pmemd.MPI "$@"
diff --git a/modules/agent-framework/airavata-agent/application/psi4.sh
b/modules/agent-framework/airavata-agent/application/psi4.sh
new file mode 100644
index 0000000000..404c713d33
--- /dev/null
+++ b/modules/agent-framework/airavata-agent/application/psi4.sh
@@ -0,0 +1,4 @@
+#!/bin/bash -x
+set -euo pipefail
+
+psi4 "$@"