This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch sdk-batch-jobs
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 5f50ad6ca23f74759542611e28456827694908c6
Author: yasithdev <[email protected]>
AuthorDate: Fri Aug 1 07:04:48 2025 -0500

    add file linking logic
---
 .../airavata_jupyter_magic/__init__.py             | 30 ++++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)

diff --git a/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py 
b/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py
index ab2ecd8bf8..c445e654b2 100644
--- a/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py
+++ b/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py
@@ -103,6 +103,7 @@ RuntimeInfo = NamedTuple('RuntimeInfo', [
     ('envName', str),
     ('pids', list[int]),
     ('tunnels', dict[str, tuple[str, int]]),
+    ('links', dict[str, str]),
 ])
 
 PENDING_STATES = [
@@ -516,11 +517,22 @@ def submit_agent_job(
         assert (pip := additional_dependencies.get("pip", None)) is not None, 
"missing /additional_dependencies/pip section"
         mounts = [f"{i['identifier']}:{i['mount_point']}" for i in collection]
 
+    # if plan file is provided, link its runs into the workspace
+    links = {}
     if plan_file is not None:
-        assert Path(plan_file).exists(), f"File {plan_file} does not exist"
+        assert Path(plan_file).exists(), f"Plan {plan_file} does not exist"
+        assert Path(plan_file).is_file(), f"Plan {plan_file} is not a file"
         with open(Path(plan_file), "r") as f:
-            plan = yaml.safe_load(f)
-        assert plan.get("experimentId") is not None, "missing experimentId in 
state file"
+            from airavata_experiments.base import Plan
+            plan = Plan(**json.load(f))
+        for task in plan.tasks:
+            plan_cluster = str(task.runtime.args.get("cluster", "N/A"))
+            if plan_cluster == "N/A":
+                print(f"[av] skipping plan task {task.name}: cluster not 
specified in the plan")
+                continue
+            assert plan_cluster == cluster, f"[av] cluster mismatched: 
{plan_cluster} in plan, {cluster} in request"
+            assert task.pid is not None, f"[av] plan task {task.name} has no 
pid"
+            links[task.pid] = task.name
 
     # payload
     data = {
@@ -547,6 +559,7 @@ def submit_agent_job(
     print(f"* libraries={data['libraries']}", flush=True)
     print(f"* pip={data['pip']}", flush=True)
     print(f"* mounts={data['mounts']}", flush=True)
+    print(f"* links={links}", flush=True)
 
     # Send the POST request
     headers = generate_headers(access_token, gateway_id)
@@ -577,6 +590,7 @@ def submit_agent_job(
             envName=obj['envName'],
             pids=[],
             tunnels={},
+            links=links,
         )
         state.all_runtimes[rt_name] = rt
         print(f'Requested runtime={rt_name}', flush=True)
@@ -1252,8 +1266,14 @@ def wait_for_runtime(line: str):
         random_port = random.randint(2000, 6000) * 5
         launch_remote_kernel(rt_name, random_port, hostname="127.0.0.1")
         print(f"Remote Jupyter kernel launched and connected for 
runtime={rt_name}.")
-    return
-
+    
+    # create symlinks
+    for pid, name in rt.links.items():
+        try:
+            state.kernel_clients[rt_name].execute(f"!ln -s ../{pid} {name}", 
silent=True, store_history=False)
+            print(f"[av] linked ../{pid} -> {name}")
+        except Exception as e:
+            print(f"[av] failed to link ../{pid} -> {name}: {e}")
 
 @register_line_magic
 def run_subprocess(line: str):

Reply via email to