This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch sdk-batch-jobs in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 5f50ad6ca23f74759542611e28456827694908c6 Author: yasithdev <[email protected]> AuthorDate: Fri Aug 1 07:04:48 2025 -0500 add file linking logic --- .../airavata_jupyter_magic/__init__.py | 30 ++++++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py b/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py index ab2ecd8bf8..c445e654b2 100644 --- a/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py +++ b/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py @@ -103,6 +103,7 @@ RuntimeInfo = NamedTuple('RuntimeInfo', [ ('envName', str), ('pids', list[int]), ('tunnels', dict[str, tuple[str, int]]), + ('links', dict[str, str]), ]) PENDING_STATES = [ @@ -516,11 +517,22 @@ def submit_agent_job( assert (pip := additional_dependencies.get("pip", None)) is not None, "missing /additional_dependencies/pip section" mounts = [f"{i['identifier']}:{i['mount_point']}" for i in collection] + # if plan file is provided, link its runs into the workspace + links = {} if plan_file is not None: - assert Path(plan_file).exists(), f"File {plan_file} does not exist" + assert Path(plan_file).exists(), f"Plan {plan_file} does not exist" + assert Path(plan_file).is_file(), f"Plan {plan_file} is not a file" with open(Path(plan_file), "r") as f: - plan = yaml.safe_load(f) - assert plan.get("experimentId") is not None, "missing experimentId in state file" + from airavata_experiments.base import Plan + plan = Plan(**json.load(f)) + for task in plan.tasks: + plan_cluster = str(task.runtime.args.get("cluster", "N/A")) + if plan_cluster == "N/A": + print(f"[av] skipping plan task {task.name}: cluster not specified in the plan") + continue + assert plan_cluster == cluster, f"[av] cluster mismatched: {plan_cluster} in plan, {cluster} in request" + assert task.pid is not None, f"[av] plan task {task.name} has no pid" + links[task.pid] = task.name # payload data = { @@ -547,6 +559,7 @@ def submit_agent_job( print(f"* libraries={data['libraries']}", flush=True) print(f"* pip={data['pip']}", flush=True) print(f"* mounts={data['mounts']}", flush=True) + print(f"* links={links}", flush=True) # Send the POST request headers = generate_headers(access_token, gateway_id) @@ -577,6 +590,7 @@ def submit_agent_job( envName=obj['envName'], pids=[], tunnels={}, + links=links, ) state.all_runtimes[rt_name] = rt print(f'Requested runtime={rt_name}', flush=True) @@ -1252,8 +1266,14 @@ def wait_for_runtime(line: str): random_port = random.randint(2000, 6000) * 5 launch_remote_kernel(rt_name, random_port, hostname="127.0.0.1") print(f"Remote Jupyter kernel launched and connected for runtime={rt_name}.") - return - + + # create symlinks + for pid, name in rt.links.items(): + try: + state.kernel_clients[rt_name].execute(f"!ln -s ../{pid} {name}", silent=True, store_history=False) + print(f"[av] linked ../{pid} -> {name}") + except Exception as e: + print(f"[av] failed to link ../{pid} -> {name}: {e}") @register_line_magic def run_subprocess(line: str):
