github-advanced-security[bot] commented on code in PR #559:
URL: https://github.com/apache/airavata/pull/559#discussion_r2345291179


##########
modules/research-framework/simexr_mod/api/routers/simulation.py:
##########
@@ -0,0 +1,510 @@
+"""
+Simulation execution API endpoints.
+"""
+
+import time
+from typing import List
+from fastapi import APIRouter, HTTPException, Depends, Request
+from pathlib import Path
+
+from ..models import (
+    SingleSimulationRequest, BatchSimulationRequest, SimulationResult,
+    BatchSimulationResponse, StatusResponse, ErrorResponse
+)
+from ..dependencies import get_simulation_service, get_data_service, 
get_database
+from core.interfaces import SimulationStatus
+
+
+router = APIRouter()
+
+
[email protected]("/import/github", summary="Import simulation from GitHub")
+async def import_from_github(
+    github_url: str,
+    model_name: str,
+    description: str = "",
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Import a simulation model from a GitHub URL.
+    
+    - **github_url**: URL to the GitHub script (e.g., 
https://github.com/user/repo/blob/main/script.py)
+    - **model_name**: Name for the imported model
+    - **description**: Optional description of the model
+    
+    Returns the generated model ID.
+    """
+    try:
+        # Extract parameters info from the script if possible
+        parameters = {
+            "github_url": "Source URL",
+            "imported": "Imported from GitHub"
+        }
+        
+        model_id = simulation_service.import_model_from_github(
+            github_url=github_url,
+            model_name=model_name,
+            description=description,
+            parameters=parameters
+        )
+        
+        return {
+            "status": "success",
+            "model_id": model_id,
+            "message": f"Successfully imported model from {github_url}",
+            "github_url": github_url,
+            "model_name": model_name
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to import from 
GitHub: {str(e)}")
+
+
[email protected]("/transform/github", summary="Transform GitHub script using 
transform_code")
+async def transform_github_script(
+    github_url: str,
+    model_name: str,
+    max_smoke_iters: int = 3
+):
+    """
+    Transform a GitHub script using the transform_code module.
+    
+    This endpoint uses ExternalScriptImporter to:
+    1. Import the script from GitHub
+    2. Refactor it to have a simulate(**params) function
+    3. Refine it through smoke testing and fixes
+    4. Return the model_id and metadata
+    
+    - **github_url**: URL to the GitHub script
+    - **model_name**: Name for the imported model
+    - **max_smoke_iters**: Maximum smoke test iterations (default: 3)
+    
+    Returns the generated model ID and processing details.
+    """
+    try:
+        print(f"[TRANSFORM API] Starting transform process for {github_url}")
+        from execute.loader.transform_code import ExternalScriptImporter
+        import tempfile
+        import os
+        
+        # Create importer
+        print("[TRANSFORM API] Creating ExternalScriptImporter...")
+        importer = ExternalScriptImporter()
+        
+        # Create temporary directory for processing
+        print(f"[TRANSFORM API] Creating temporary directory...")
+        with tempfile.TemporaryDirectory() as temp_dir:
+            print(f"[TRANSFORM API] Temporary directory created: {temp_dir}")
+            # Import and refactor using transform_code
+            print(f"[TRANSFORM API] Calling import_and_refactor...")
+            model_id, metadata = importer.import_and_refactor(
+                source_url=github_url,
+                model_name=model_name,
+                dest_dir=temp_dir,
+                max_smoke_iters=max_smoke_iters
+            )
+            
+            print(f"[TRANSFORM API] Import and refactor completed. Model ID: 
{model_id}")
+            # Get the final script path from the database
+            from db import get_simulation_path
+            try:
+                script_path = get_simulation_path(model_id)
+                print(f"[TRANSFORM API] Script path from database: 
{script_path}")
+            except:
+                # Fallback to expected path
+                script_path = f"external_models/{model_name}.py"
+                print(f"[TRANSFORM API] Using fallback script path: 
{script_path}")
+            
+            # Read the final script content
+            print(f"[TRANSFORM API] Reading script content...")
+            with open(script_path, 'r') as f:

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/43)



##########
modules/research-framework/simexr_mod/api/routers/simulation.py:
##########
@@ -0,0 +1,510 @@
+"""
+Simulation execution API endpoints.
+"""
+
+import time
+from typing import List
+from fastapi import APIRouter, HTTPException, Depends, Request
+from pathlib import Path
+
+from ..models import (
+    SingleSimulationRequest, BatchSimulationRequest, SimulationResult,
+    BatchSimulationResponse, StatusResponse, ErrorResponse
+)
+from ..dependencies import get_simulation_service, get_data_service, 
get_database
+from core.interfaces import SimulationStatus
+
+
+router = APIRouter()
+
+
[email protected]("/import/github", summary="Import simulation from GitHub")
+async def import_from_github(
+    github_url: str,
+    model_name: str,
+    description: str = "",
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Import a simulation model from a GitHub URL.
+    
+    - **github_url**: URL to the GitHub script (e.g., 
https://github.com/user/repo/blob/main/script.py)
+    - **model_name**: Name for the imported model
+    - **description**: Optional description of the model
+    
+    Returns the generated model ID.
+    """
+    try:
+        # Extract parameters info from the script if possible
+        parameters = {
+            "github_url": "Source URL",
+            "imported": "Imported from GitHub"
+        }
+        
+        model_id = simulation_service.import_model_from_github(
+            github_url=github_url,
+            model_name=model_name,
+            description=description,
+            parameters=parameters
+        )
+        
+        return {
+            "status": "success",
+            "model_id": model_id,
+            "message": f"Successfully imported model from {github_url}",
+            "github_url": github_url,
+            "model_name": model_name
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to import from 
GitHub: {str(e)}")
+
+
[email protected]("/transform/github", summary="Transform GitHub script using 
transform_code")
+async def transform_github_script(
+    github_url: str,
+    model_name: str,
+    max_smoke_iters: int = 3
+):
+    """
+    Transform a GitHub script using the transform_code module.
+    
+    This endpoint uses ExternalScriptImporter to:
+    1. Import the script from GitHub
+    2. Refactor it to have a simulate(**params) function
+    3. Refine it through smoke testing and fixes
+    4. Return the model_id and metadata
+    
+    - **github_url**: URL to the GitHub script
+    - **model_name**: Name for the imported model
+    - **max_smoke_iters**: Maximum smoke test iterations (default: 3)
+    
+    Returns the generated model ID and processing details.
+    """
+    try:
+        print(f"[TRANSFORM API] Starting transform process for {github_url}")
+        from execute.loader.transform_code import ExternalScriptImporter
+        import tempfile
+        import os
+        
+        # Create importer
+        print("[TRANSFORM API] Creating ExternalScriptImporter...")
+        importer = ExternalScriptImporter()
+        
+        # Create temporary directory for processing
+        print(f"[TRANSFORM API] Creating temporary directory...")
+        with tempfile.TemporaryDirectory() as temp_dir:
+            print(f"[TRANSFORM API] Temporary directory created: {temp_dir}")
+            # Import and refactor using transform_code
+            print(f"[TRANSFORM API] Calling import_and_refactor...")
+            model_id, metadata = importer.import_and_refactor(
+                source_url=github_url,
+                model_name=model_name,
+                dest_dir=temp_dir,
+                max_smoke_iters=max_smoke_iters
+            )
+            
+            print(f"[TRANSFORM API] Import and refactor completed. Model ID: 
{model_id}")
+            # Get the final script path from the database
+            from db import get_simulation_path
+            try:
+                script_path = get_simulation_path(model_id)
+                print(f"[TRANSFORM API] Script path from database: 
{script_path}")
+            except:
+                # Fallback to expected path
+                script_path = f"external_models/{model_name}.py"
+                print(f"[TRANSFORM API] Using fallback script path: 
{script_path}")
+            
+            # Read the final script content
+            print(f"[TRANSFORM API] Reading script content...")
+            with open(script_path, 'r') as f:
+                script_content = f.read()
+            print(f"[TRANSFORM API] Script content length: 
{len(script_content)}")
+            
+            return {
+                "status": "success",
+                "model_id": model_id,
+                "message": f"Successfully transformed script from 
{github_url}",
+                "github_url": github_url,
+                "model_name": model_name,
+                "script_path": script_path,
+                "script_content": script_content,
+                "metadata": metadata,
+                "processing_details": {
+                    "max_smoke_iters": max_smoke_iters,
+                    "script_size": len(script_content),
+                    "temp_directory": temp_dir
+                }
+            }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to transform 
GitHub script: {str(e)}")
+
+
[email protected]("/run", response_model=SimulationResult, summary="Run single 
simulation")
+async def run_single_simulation(
+    request: SingleSimulationRequest,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Execute a single simulation with given parameters.
+    
+    - **model_id**: ID of the simulation model
+    - **parameters**: Dictionary of simulation parameters
+    
+    Returns the simulation result with outputs and execution metadata.
+    """
+    try:
+        # Use the service layer
+        result = simulation_service.run_single_simulation(
+            model_id=request.model_id,
+            parameters=request.parameters.model_dump()
+        )
+        
+        # Convert to API response format
+        return SimulationResult(
+            success=result.status == SimulationStatus.COMPLETED,
+            parameters=result.parameters,
+            results=result.outputs,
+            execution_time=result.execution_time,
+            stdout=result.stdout,
+            stderr=result.stderr,
+            error_message=result.error_message
+        )
+        
+    except FileNotFoundError:
+        raise HTTPException(status_code=404, detail=f"Model {request.model_id} 
not found")
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Simulation failed: 
{str(e)}")
+
+
[email protected]("/batch", response_model=BatchSimulationResponse, summary="Run 
batch simulations")
+async def run_batch_simulation(
+    request: BatchSimulationRequest,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Execute multiple simulations in batch with different parameter sets.
+    
+    - **model_id**: ID of the simulation model
+    - **parameter_grid**: List of parameter dictionaries
+    
+    Returns batch execution results with statistics.
+    """
+    try:
+        start_time = time.time()
+        
+        # Convert parameter grid
+        param_grid = [params.model_dump() for params in request.parameter_grid]
+        
+        # Use the service layer
+        results = simulation_service.run_batch_simulations(
+            model_id=request.model_id,
+            parameter_grid=param_grid
+        )
+        
+        # Convert to API response format
+        api_results = []
+        for result in results:
+            api_result = SimulationResult(
+                success=result.status == SimulationStatus.COMPLETED,
+                parameters=result.parameters,
+                results=result.outputs,
+                execution_time=result.execution_time,
+                stdout=result.stdout,
+                stderr=result.stderr,
+                error_message=result.error_message
+            )
+            api_results.append(api_result)
+        
+        execution_time = time.time() - start_time
+        successful_runs = sum(1 for r in api_results if r.success)
+        failed_runs = len(api_results) - successful_runs
+        
+        return BatchSimulationResponse(
+            status="completed",
+            total_runs=len(api_results),
+            successful_runs=successful_runs,
+            failed_runs=failed_runs,
+            results=api_results,
+            execution_time=execution_time
+        )
+        
+    except FileNotFoundError:
+        raise HTTPException(status_code=404, detail=f"Model {request.model_id} 
not found")
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Batch simulation failed: 
{str(e)}")
+
+
[email protected]("/models", summary="List available simulation models")
+async def list_models(simulation_service = Depends(get_simulation_service)):
+    """
+    Get a list of all available simulation models.
+    
+    Returns list of models with basic information.
+    """
+    try:
+        models = simulation_service.list_models()
+        return {
+            "status": "success",
+            "count": len(models),
+            "models": models
+        }
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to list models: 
{str(e)}")
+
+
[email protected]("/models/search", summary="Search models by name (fuzzy search)")
+async def search_models_by_name(
+    name: str,
+    limit: int = 20,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Search for simulation models by name using fuzzy matching.
+    
+    - **name**: Partial name to search for (case-insensitive)
+    - **limit**: Maximum number of results to return (default: 20)
+    
+    Returns models that match the search criteria.
+    """
+    try:
+        import re
+        
+        # Get all models
+        all_models = simulation_service.list_models()
+        
+        # Convert search term to lowercase for case-insensitive matching
+        search_term = name.lower()
+        
+        # Filter models using fuzzy matching
+        matching_models = []
+        for model in all_models:
+            model_name = model.get('name', '').lower()
+            model_id = model.get('id', '').lower()
+            
+            # Check if search term appears in model name or ID
+            if (search_term in model_name or 
+                search_term in model_id or
+                any(word in model_name for word in search_term.split()) or
+                any(word in model_id for word in search_term.split())):
+                matching_models.append(model)
+        
+        # Sort by relevance (exact matches first, then partial matches)
+        def relevance_score(model):
+            model_name = model.get('name', '').lower()
+            model_id = model.get('id', '').lower()
+            
+            # Exact match gets highest score
+            if search_term == model_name or search_term == model_id:
+                return 100
+            # Starts with search term
+            elif model_name.startswith(search_term) or 
model_id.startswith(search_term):
+                return 90
+            # Contains search term
+            elif search_term in model_name or search_term in model_id:
+                return 80
+            # Word boundary matches
+            elif any(word in model_name for word in search_term.split()):
+                return 70
+            else:
+                return 50
+        
+        # Sort by relevance and limit results
+        matching_models.sort(key=relevance_score, reverse=True)
+        limited_models = matching_models[:limit]
+        
+        return {
+            "status": "success",
+            "search_term": name,
+            "total_matches": len(matching_models),
+            "returned_count": len(limited_models),
+            "limit": limit,
+            "models": limited_models
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to search models: 
{str(e)}")
+
+
[email protected]("/models/{model_id}", summary="Get model information")
+async def get_model_info(model_id: str, simulation_service = 
Depends(get_simulation_service)):
+    """
+    Get detailed information about a specific simulation model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns model metadata and script information.
+    """
+    try:
+        model_info = simulation_service.get_model_info(model_id)
+        
+        return {
+            "status": "success",
+            "model": model_info
+        }
+        
+    except ValueError as e:
+        raise HTTPException(status_code=404, detail=str(e))
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to get model 
info: {str(e)}")
+
+
[email protected]("/models/{model_id}/results", summary="Get simulation results")
+async def get_model_results(
+    model_id: str, 
+    limit: int = 100,
+    offset: int = 0,
+    data_service = Depends(get_data_service)
+):
+    """
+    Get simulation results for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    - **limit**: Maximum number of results to return (default: 100)
+    - **offset**: Number of results to skip (default: 0)
+    
+    Returns paginated simulation results.
+    """
+    try:
+        results_data = data_service.get_simulation_results(
+            model_id=model_id, 
+            limit=limit, 
+            offset=offset
+        )
+        
+        return {
+            "status": "success",
+            **results_data
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to get results: 
{str(e)}")
+
+
[email protected]("/models/{model_id}/results", summary="Clear model results")
+async def clear_model_results(model_id: str, db = Depends(get_database)):
+    """
+    Clear all simulation results for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns confirmation of deletion.
+    """
+    try:
+        # Delete results from database
+        with db.config.get_sqlite_connection() as conn:
+            cursor = conn.execute(
+                "DELETE FROM results WHERE model_id = ?", 
+                (model_id,)
+            )
+            deleted_count = cursor.rowcount
+        
+        return {
+            "status": "success",
+            "message": f"Deleted {deleted_count} results for model {model_id}",
+            "deleted_count": deleted_count
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to clear results: 
{str(e)}")
+
+
[email protected]("/models/{model_id}/script", summary="Get model script")
+async def get_model_script(model_id: str, simulation_service = 
Depends(get_simulation_service)):
+    """
+    Get the refactored script for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns the script content.
+    """
+    try:
+        model_info = simulation_service.get_model_info(model_id)
+        script_path = model_info.get("script_path")
+        
+        # Try to find the script in common locations if script_path is not 
available
+        if not script_path:
+            # Look for script in external_models directory
+            possible_paths = [
+                f"external_models/{model_id}.py",
+                f"external_models/{model_info.get('name', model_id)}.py",
+                f"systems/models/{model_id}.py",
+                f"systems/models/{model_info.get('name', model_id)}.py"
+            ]
+            
+            for path in possible_paths:
+                if Path(path).exists():
+                    script_path = path
+                    break
+        
+        if not script_path or not Path(script_path).exists():

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/45)



##########
modules/research-framework/simexr_mod/api/routers/simulation.py:
##########
@@ -0,0 +1,510 @@
+"""
+Simulation execution API endpoints.
+"""
+
+import time
+from typing import List
+from fastapi import APIRouter, HTTPException, Depends, Request
+from pathlib import Path
+
+from ..models import (
+    SingleSimulationRequest, BatchSimulationRequest, SimulationResult,
+    BatchSimulationResponse, StatusResponse, ErrorResponse
+)
+from ..dependencies import get_simulation_service, get_data_service, 
get_database
+from core.interfaces import SimulationStatus
+
+
+router = APIRouter()
+
+
[email protected]("/import/github", summary="Import simulation from GitHub")
+async def import_from_github(
+    github_url: str,
+    model_name: str,
+    description: str = "",
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Import a simulation model from a GitHub URL.
+    
+    - **github_url**: URL to the GitHub script (e.g., 
https://github.com/user/repo/blob/main/script.py)
+    - **model_name**: Name for the imported model
+    - **description**: Optional description of the model
+    
+    Returns the generated model ID.
+    """
+    try:
+        # Extract parameters info from the script if possible
+        parameters = {
+            "github_url": "Source URL",
+            "imported": "Imported from GitHub"
+        }
+        
+        model_id = simulation_service.import_model_from_github(
+            github_url=github_url,
+            model_name=model_name,
+            description=description,
+            parameters=parameters
+        )
+        
+        return {
+            "status": "success",
+            "model_id": model_id,
+            "message": f"Successfully imported model from {github_url}",
+            "github_url": github_url,
+            "model_name": model_name
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to import from 
GitHub: {str(e)}")
+
+
[email protected]("/transform/github", summary="Transform GitHub script using 
transform_code")
+async def transform_github_script(
+    github_url: str,
+    model_name: str,
+    max_smoke_iters: int = 3
+):
+    """
+    Transform a GitHub script using the transform_code module.
+    
+    This endpoint uses ExternalScriptImporter to:
+    1. Import the script from GitHub
+    2. Refactor it to have a simulate(**params) function
+    3. Refine it through smoke testing and fixes
+    4. Return the model_id and metadata
+    
+    - **github_url**: URL to the GitHub script
+    - **model_name**: Name for the imported model
+    - **max_smoke_iters**: Maximum smoke test iterations (default: 3)
+    
+    Returns the generated model ID and processing details.
+    """
+    try:
+        print(f"[TRANSFORM API] Starting transform process for {github_url}")
+        from execute.loader.transform_code import ExternalScriptImporter
+        import tempfile
+        import os
+        
+        # Create importer
+        print("[TRANSFORM API] Creating ExternalScriptImporter...")
+        importer = ExternalScriptImporter()
+        
+        # Create temporary directory for processing
+        print(f"[TRANSFORM API] Creating temporary directory...")
+        with tempfile.TemporaryDirectory() as temp_dir:
+            print(f"[TRANSFORM API] Temporary directory created: {temp_dir}")
+            # Import and refactor using transform_code
+            print(f"[TRANSFORM API] Calling import_and_refactor...")
+            model_id, metadata = importer.import_and_refactor(
+                source_url=github_url,
+                model_name=model_name,
+                dest_dir=temp_dir,
+                max_smoke_iters=max_smoke_iters
+            )
+            
+            print(f"[TRANSFORM API] Import and refactor completed. Model ID: 
{model_id}")
+            # Get the final script path from the database
+            from db import get_simulation_path
+            try:
+                script_path = get_simulation_path(model_id)
+                print(f"[TRANSFORM API] Script path from database: 
{script_path}")
+            except:
+                # Fallback to expected path
+                script_path = f"external_models/{model_name}.py"
+                print(f"[TRANSFORM API] Using fallback script path: 
{script_path}")
+            
+            # Read the final script content
+            print(f"[TRANSFORM API] Reading script content...")
+            with open(script_path, 'r') as f:
+                script_content = f.read()
+            print(f"[TRANSFORM API] Script content length: 
{len(script_content)}")
+            
+            return {
+                "status": "success",
+                "model_id": model_id,
+                "message": f"Successfully transformed script from 
{github_url}",
+                "github_url": github_url,
+                "model_name": model_name,
+                "script_path": script_path,
+                "script_content": script_content,
+                "metadata": metadata,
+                "processing_details": {
+                    "max_smoke_iters": max_smoke_iters,
+                    "script_size": len(script_content),
+                    "temp_directory": temp_dir
+                }
+            }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to transform 
GitHub script: {str(e)}")
+
+
[email protected]("/run", response_model=SimulationResult, summary="Run single 
simulation")
+async def run_single_simulation(
+    request: SingleSimulationRequest,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Execute a single simulation with given parameters.
+    
+    - **model_id**: ID of the simulation model
+    - **parameters**: Dictionary of simulation parameters
+    
+    Returns the simulation result with outputs and execution metadata.
+    """
+    try:
+        # Use the service layer
+        result = simulation_service.run_single_simulation(
+            model_id=request.model_id,
+            parameters=request.parameters.model_dump()
+        )
+        
+        # Convert to API response format
+        return SimulationResult(
+            success=result.status == SimulationStatus.COMPLETED,
+            parameters=result.parameters,
+            results=result.outputs,
+            execution_time=result.execution_time,
+            stdout=result.stdout,
+            stderr=result.stderr,
+            error_message=result.error_message
+        )
+        
+    except FileNotFoundError:
+        raise HTTPException(status_code=404, detail=f"Model {request.model_id} 
not found")
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Simulation failed: 
{str(e)}")
+
+
[email protected]("/batch", response_model=BatchSimulationResponse, summary="Run 
batch simulations")
+async def run_batch_simulation(
+    request: BatchSimulationRequest,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Execute multiple simulations in batch with different parameter sets.
+    
+    - **model_id**: ID of the simulation model
+    - **parameter_grid**: List of parameter dictionaries
+    
+    Returns batch execution results with statistics.
+    """
+    try:
+        start_time = time.time()
+        
+        # Convert parameter grid
+        param_grid = [params.model_dump() for params in request.parameter_grid]
+        
+        # Use the service layer
+        results = simulation_service.run_batch_simulations(
+            model_id=request.model_id,
+            parameter_grid=param_grid
+        )
+        
+        # Convert to API response format
+        api_results = []
+        for result in results:
+            api_result = SimulationResult(
+                success=result.status == SimulationStatus.COMPLETED,
+                parameters=result.parameters,
+                results=result.outputs,
+                execution_time=result.execution_time,
+                stdout=result.stdout,
+                stderr=result.stderr,
+                error_message=result.error_message
+            )
+            api_results.append(api_result)
+        
+        execution_time = time.time() - start_time
+        successful_runs = sum(1 for r in api_results if r.success)
+        failed_runs = len(api_results) - successful_runs
+        
+        return BatchSimulationResponse(
+            status="completed",
+            total_runs=len(api_results),
+            successful_runs=successful_runs,
+            failed_runs=failed_runs,
+            results=api_results,
+            execution_time=execution_time
+        )
+        
+    except FileNotFoundError:
+        raise HTTPException(status_code=404, detail=f"Model {request.model_id} 
not found")
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Batch simulation failed: 
{str(e)}")
+
+
[email protected]("/models", summary="List available simulation models")
+async def list_models(simulation_service = Depends(get_simulation_service)):
+    """
+    Get a list of all available simulation models.
+    
+    Returns list of models with basic information.
+    """
+    try:
+        models = simulation_service.list_models()
+        return {
+            "status": "success",
+            "count": len(models),
+            "models": models
+        }
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to list models: 
{str(e)}")
+
+
[email protected]("/models/search", summary="Search models by name (fuzzy search)")
+async def search_models_by_name(
+    name: str,
+    limit: int = 20,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Search for simulation models by name using fuzzy matching.
+    
+    - **name**: Partial name to search for (case-insensitive)
+    - **limit**: Maximum number of results to return (default: 20)
+    
+    Returns models that match the search criteria.
+    """
+    try:
+        import re
+        
+        # Get all models
+        all_models = simulation_service.list_models()
+        
+        # Convert search term to lowercase for case-insensitive matching
+        search_term = name.lower()
+        
+        # Filter models using fuzzy matching
+        matching_models = []
+        for model in all_models:
+            model_name = model.get('name', '').lower()
+            model_id = model.get('id', '').lower()
+            
+            # Check if search term appears in model name or ID
+            if (search_term in model_name or 
+                search_term in model_id or
+                any(word in model_name for word in search_term.split()) or
+                any(word in model_id for word in search_term.split())):
+                matching_models.append(model)
+        
+        # Sort by relevance (exact matches first, then partial matches)
+        def relevance_score(model):
+            model_name = model.get('name', '').lower()
+            model_id = model.get('id', '').lower()
+            
+            # Exact match gets highest score
+            if search_term == model_name or search_term == model_id:
+                return 100
+            # Starts with search term
+            elif model_name.startswith(search_term) or 
model_id.startswith(search_term):
+                return 90
+            # Contains search term
+            elif search_term in model_name or search_term in model_id:
+                return 80
+            # Word boundary matches
+            elif any(word in model_name for word in search_term.split()):
+                return 70
+            else:
+                return 50
+        
+        # Sort by relevance and limit results
+        matching_models.sort(key=relevance_score, reverse=True)
+        limited_models = matching_models[:limit]
+        
+        return {
+            "status": "success",
+            "search_term": name,
+            "total_matches": len(matching_models),
+            "returned_count": len(limited_models),
+            "limit": limit,
+            "models": limited_models
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to search models: 
{str(e)}")
+
+
[email protected]("/models/{model_id}", summary="Get model information")
+async def get_model_info(model_id: str, simulation_service = 
Depends(get_simulation_service)):
+    """
+    Get detailed information about a specific simulation model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns model metadata and script information.
+    """
+    try:
+        model_info = simulation_service.get_model_info(model_id)
+        
+        return {
+            "status": "success",
+            "model": model_info
+        }
+        
+    except ValueError as e:
+        raise HTTPException(status_code=404, detail=str(e))
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to get model 
info: {str(e)}")
+
+
[email protected]("/models/{model_id}/results", summary="Get simulation results")
+async def get_model_results(
+    model_id: str, 
+    limit: int = 100,
+    offset: int = 0,
+    data_service = Depends(get_data_service)
+):
+    """
+    Get simulation results for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    - **limit**: Maximum number of results to return (default: 100)
+    - **offset**: Number of results to skip (default: 0)
+    
+    Returns paginated simulation results.
+    """
+    try:
+        results_data = data_service.get_simulation_results(
+            model_id=model_id, 
+            limit=limit, 
+            offset=offset
+        )
+        
+        return {
+            "status": "success",
+            **results_data
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to get results: 
{str(e)}")
+
+
[email protected]("/models/{model_id}/results", summary="Clear model results")
+async def clear_model_results(model_id: str, db = Depends(get_database)):
+    """
+    Clear all simulation results for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns confirmation of deletion.
+    """
+    try:
+        # Delete results from database
+        with db.config.get_sqlite_connection() as conn:
+            cursor = conn.execute(
+                "DELETE FROM results WHERE model_id = ?", 
+                (model_id,)
+            )
+            deleted_count = cursor.rowcount
+        
+        return {
+            "status": "success",
+            "message": f"Deleted {deleted_count} results for model {model_id}",
+            "deleted_count": deleted_count
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to clear results: 
{str(e)}")
+
+
[email protected]("/models/{model_id}/script", summary="Get model script")
+async def get_model_script(model_id: str, simulation_service = 
Depends(get_simulation_service)):
+    """
+    Get the refactored script for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns the script content.
+    """
+    try:
+        model_info = simulation_service.get_model_info(model_id)
+        script_path = model_info.get("script_path")
+        
+        # Try to find the script in common locations if script_path is not 
available
+        if not script_path:
+            # Look for script in external_models directory
+            possible_paths = [
+                f"external_models/{model_id}.py",
+                f"external_models/{model_info.get('name', model_id)}.py",
+                f"systems/models/{model_id}.py",
+                f"systems/models/{model_info.get('name', model_id)}.py"
+            ]
+            
+            for path in possible_paths:
+                if Path(path).exists():

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/44)



##########
modules/research-framework/simexr_mod/code/utils/notebook_utils.py:
##########
@@ -0,0 +1,36 @@
+import nbformat
+from nbconvert import PythonExporter
+from pathlib import Path
+import shutil
+
+def notebook_to_script(notebook_path: str, output_dir: str = 
"external_models") -> str:
+    """
+    If `notebook_path` is a Jupyter notebook (.ipynb), convert it to a .py 
script
+    in `output_dir`, returning the script path.
+    If it's already a .py file, ensure it's in `output_dir` (copy if needed)
+    and return its path.
+    """
+    src = Path(notebook_path)
+    out_dir = Path(output_dir)
+    out_dir.mkdir(parents=True, exist_ok=True)
+
+    # Case 1: Already a Python script
+    if src.suffix.lower() == ".py":
+        dest = out_dir / src.name
+        # copy only if not already in the target dir
+        if src.resolve() != dest.resolve():
+            shutil.copy2(src, dest)
+        return str(dest)
+
+    # Case 2: Jupyter notebook → Python script
+    if src.suffix.lower() == ".ipynb":
+        nb = nbformat.read(src, as_version=4)
+        exporter = PythonExporter()
+        script_source, _ = exporter.from_notebook_node(nb)
+
+        py_path = out_dir / (src.stem + ".py")
+        py_path.write_text(script_source)

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/42)



##########
modules/research-framework/simexr_mod/code/utils/notebook_utils.py:
##########
@@ -0,0 +1,36 @@
+import nbformat
+from nbconvert import PythonExporter
+from pathlib import Path
+import shutil
+
+def notebook_to_script(notebook_path: str, output_dir: str = 
"external_models") -> str:
+    """
+    If `notebook_path` is a Jupyter notebook (.ipynb), convert it to a .py 
script
+    in `output_dir`, returning the script path.
+    If it's already a .py file, ensure it's in `output_dir` (copy if needed)
+    and return its path.
+    """
+    src = Path(notebook_path)
+    out_dir = Path(output_dir)
+    out_dir.mkdir(parents=True, exist_ok=True)
+
+    # Case 1: Already a Python script
+    if src.suffix.lower() == ".py":
+        dest = out_dir / src.name
+        # copy only if not already in the target dir
+        if src.resolve() != dest.resolve():
+            shutil.copy2(src, dest)

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/40)



##########
modules/research-framework/simexr_mod/code/utils/notebook_utils.py:
##########
@@ -0,0 +1,36 @@
+import nbformat
+from nbconvert import PythonExporter
+from pathlib import Path
+import shutil
+
+def notebook_to_script(notebook_path: str, output_dir: str = 
"external_models") -> str:
+    """
+    If `notebook_path` is a Jupyter notebook (.ipynb), convert it to a .py 
script
+    in `output_dir`, returning the script path.
+    If it's already a .py file, ensure it's in `output_dir` (copy if needed)
+    and return its path.
+    """
+    src = Path(notebook_path)
+    out_dir = Path(output_dir)
+    out_dir.mkdir(parents=True, exist_ok=True)
+
+    # Case 1: Already a Python script
+    if src.suffix.lower() == ".py":
+        dest = out_dir / src.name
+        # copy only if not already in the target dir
+        if src.resolve() != dest.resolve():

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/39)



##########
modules/research-framework/simexr_mod/code/utils/github_utils.py:
##########
@@ -0,0 +1,26 @@
+import requests
+from pathlib import Path
+
+def fetch_notebook_from_github(github_url: str, dest_dir: str = 
"external_models") -> str:
+    """
+    Downloads a file from a GitHub URL and saves it locally.
+    Handles both raw URLs and blob URLs.
+    Returns the local path to the saved file.
+    """
+    # Convert GitHub blob URL to raw URL if needed
+    if "github.com" in github_url and "/blob/" in github_url:

Review Comment:
   ## Incomplete URL substring sanitization
   
   The string [github.com](1) may be at an arbitrary position in the sanitized 
URL.
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/32)



##########
modules/research-framework/simexr_mod/api/routers/simulation.py:
##########
@@ -0,0 +1,510 @@
+"""
+Simulation execution API endpoints.
+"""
+
+import time
+from typing import List
+from fastapi import APIRouter, HTTPException, Depends, Request
+from pathlib import Path
+
+from ..models import (
+    SingleSimulationRequest, BatchSimulationRequest, SimulationResult,
+    BatchSimulationResponse, StatusResponse, ErrorResponse
+)
+from ..dependencies import get_simulation_service, get_data_service, 
get_database
+from core.interfaces import SimulationStatus
+
+
+router = APIRouter()
+
+
[email protected]("/import/github", summary="Import simulation from GitHub")
+async def import_from_github(
+    github_url: str,
+    model_name: str,
+    description: str = "",
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Import a simulation model from a GitHub URL.
+    
+    - **github_url**: URL to the GitHub script (e.g., 
https://github.com/user/repo/blob/main/script.py)
+    - **model_name**: Name for the imported model
+    - **description**: Optional description of the model
+    
+    Returns the generated model ID.
+    """
+    try:
+        # Extract parameters info from the script if possible
+        parameters = {
+            "github_url": "Source URL",
+            "imported": "Imported from GitHub"
+        }
+        
+        model_id = simulation_service.import_model_from_github(
+            github_url=github_url,
+            model_name=model_name,
+            description=description,
+            parameters=parameters
+        )
+        
+        return {
+            "status": "success",
+            "model_id": model_id,
+            "message": f"Successfully imported model from {github_url}",
+            "github_url": github_url,
+            "model_name": model_name
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to import from 
GitHub: {str(e)}")
+
+
[email protected]("/transform/github", summary="Transform GitHub script using 
transform_code")
+async def transform_github_script(
+    github_url: str,
+    model_name: str,
+    max_smoke_iters: int = 3
+):
+    """
+    Transform a GitHub script using the transform_code module.
+    
+    This endpoint uses ExternalScriptImporter to:
+    1. Import the script from GitHub
+    2. Refactor it to have a simulate(**params) function
+    3. Refine it through smoke testing and fixes
+    4. Return the model_id and metadata
+    
+    - **github_url**: URL to the GitHub script
+    - **model_name**: Name for the imported model
+    - **max_smoke_iters**: Maximum smoke test iterations (default: 3)
+    
+    Returns the generated model ID and processing details.
+    """
+    try:
+        print(f"[TRANSFORM API] Starting transform process for {github_url}")
+        from execute.loader.transform_code import ExternalScriptImporter
+        import tempfile
+        import os
+        
+        # Create importer
+        print("[TRANSFORM API] Creating ExternalScriptImporter...")
+        importer = ExternalScriptImporter()
+        
+        # Create temporary directory for processing
+        print(f"[TRANSFORM API] Creating temporary directory...")
+        with tempfile.TemporaryDirectory() as temp_dir:
+            print(f"[TRANSFORM API] Temporary directory created: {temp_dir}")
+            # Import and refactor using transform_code
+            print(f"[TRANSFORM API] Calling import_and_refactor...")
+            model_id, metadata = importer.import_and_refactor(
+                source_url=github_url,
+                model_name=model_name,
+                dest_dir=temp_dir,
+                max_smoke_iters=max_smoke_iters
+            )
+            
+            print(f"[TRANSFORM API] Import and refactor completed. Model ID: 
{model_id}")
+            # Get the final script path from the database
+            from db import get_simulation_path
+            try:
+                script_path = get_simulation_path(model_id)
+                print(f"[TRANSFORM API] Script path from database: 
{script_path}")
+            except:
+                # Fallback to expected path
+                script_path = f"external_models/{model_name}.py"
+                print(f"[TRANSFORM API] Using fallback script path: 
{script_path}")
+            
+            # Read the final script content
+            print(f"[TRANSFORM API] Reading script content...")
+            with open(script_path, 'r') as f:
+                script_content = f.read()
+            print(f"[TRANSFORM API] Script content length: 
{len(script_content)}")
+            
+            return {
+                "status": "success",
+                "model_id": model_id,
+                "message": f"Successfully transformed script from 
{github_url}",
+                "github_url": github_url,
+                "model_name": model_name,
+                "script_path": script_path,
+                "script_content": script_content,
+                "metadata": metadata,
+                "processing_details": {
+                    "max_smoke_iters": max_smoke_iters,
+                    "script_size": len(script_content),
+                    "temp_directory": temp_dir
+                }
+            }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to transform 
GitHub script: {str(e)}")
+
+
[email protected]("/run", response_model=SimulationResult, summary="Run single 
simulation")
+async def run_single_simulation(
+    request: SingleSimulationRequest,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Execute a single simulation with given parameters.
+    
+    - **model_id**: ID of the simulation model
+    - **parameters**: Dictionary of simulation parameters
+    
+    Returns the simulation result with outputs and execution metadata.
+    """
+    try:
+        # Use the service layer
+        result = simulation_service.run_single_simulation(
+            model_id=request.model_id,
+            parameters=request.parameters.model_dump()
+        )
+        
+        # Convert to API response format
+        return SimulationResult(
+            success=result.status == SimulationStatus.COMPLETED,
+            parameters=result.parameters,
+            results=result.outputs,
+            execution_time=result.execution_time,
+            stdout=result.stdout,
+            stderr=result.stderr,
+            error_message=result.error_message
+        )
+        
+    except FileNotFoundError:
+        raise HTTPException(status_code=404, detail=f"Model {request.model_id} 
not found")
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Simulation failed: 
{str(e)}")
+
+
[email protected]("/batch", response_model=BatchSimulationResponse, summary="Run 
batch simulations")
+async def run_batch_simulation(
+    request: BatchSimulationRequest,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Execute multiple simulations in batch with different parameter sets.
+    
+    - **model_id**: ID of the simulation model
+    - **parameter_grid**: List of parameter dictionaries
+    
+    Returns batch execution results with statistics.
+    """
+    try:
+        start_time = time.time()
+        
+        # Convert parameter grid
+        param_grid = [params.model_dump() for params in request.parameter_grid]
+        
+        # Use the service layer
+        results = simulation_service.run_batch_simulations(
+            model_id=request.model_id,
+            parameter_grid=param_grid
+        )
+        
+        # Convert to API response format
+        api_results = []
+        for result in results:
+            api_result = SimulationResult(
+                success=result.status == SimulationStatus.COMPLETED,
+                parameters=result.parameters,
+                results=result.outputs,
+                execution_time=result.execution_time,
+                stdout=result.stdout,
+                stderr=result.stderr,
+                error_message=result.error_message
+            )
+            api_results.append(api_result)
+        
+        execution_time = time.time() - start_time
+        successful_runs = sum(1 for r in api_results if r.success)
+        failed_runs = len(api_results) - successful_runs
+        
+        return BatchSimulationResponse(
+            status="completed",
+            total_runs=len(api_results),
+            successful_runs=successful_runs,
+            failed_runs=failed_runs,
+            results=api_results,
+            execution_time=execution_time
+        )
+        
+    except FileNotFoundError:
+        raise HTTPException(status_code=404, detail=f"Model {request.model_id} 
not found")
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Batch simulation failed: 
{str(e)}")
+
+
[email protected]("/models", summary="List available simulation models")
+async def list_models(simulation_service = Depends(get_simulation_service)):
+    """
+    Get a list of all available simulation models.
+    
+    Returns list of models with basic information.
+    """
+    try:
+        models = simulation_service.list_models()
+        return {
+            "status": "success",
+            "count": len(models),
+            "models": models
+        }
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to list models: 
{str(e)}")
+
+
[email protected]("/models/search", summary="Search models by name (fuzzy search)")
+async def search_models_by_name(
+    name: str,
+    limit: int = 20,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Search for simulation models by name using fuzzy matching.
+    
+    - **name**: Partial name to search for (case-insensitive)
+    - **limit**: Maximum number of results to return (default: 20)
+    
+    Returns models that match the search criteria.
+    """
+    try:
+        import re
+        
+        # Get all models
+        all_models = simulation_service.list_models()
+        
+        # Convert search term to lowercase for case-insensitive matching
+        search_term = name.lower()
+        
+        # Filter models using fuzzy matching
+        matching_models = []
+        for model in all_models:
+            model_name = model.get('name', '').lower()
+            model_id = model.get('id', '').lower()
+            
+            # Check if search term appears in model name or ID
+            if (search_term in model_name or 
+                search_term in model_id or
+                any(word in model_name for word in search_term.split()) or
+                any(word in model_id for word in search_term.split())):
+                matching_models.append(model)
+        
+        # Sort by relevance (exact matches first, then partial matches)
+        def relevance_score(model):
+            model_name = model.get('name', '').lower()
+            model_id = model.get('id', '').lower()
+            
+            # Exact match gets highest score
+            if search_term == model_name or search_term == model_id:
+                return 100
+            # Starts with search term
+            elif model_name.startswith(search_term) or 
model_id.startswith(search_term):
+                return 90
+            # Contains search term
+            elif search_term in model_name or search_term in model_id:
+                return 80
+            # Word boundary matches
+            elif any(word in model_name for word in search_term.split()):
+                return 70
+            else:
+                return 50
+        
+        # Sort by relevance and limit results
+        matching_models.sort(key=relevance_score, reverse=True)
+        limited_models = matching_models[:limit]
+        
+        return {
+            "status": "success",
+            "search_term": name,
+            "total_matches": len(matching_models),
+            "returned_count": len(limited_models),
+            "limit": limit,
+            "models": limited_models
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to search models: 
{str(e)}")
+
+
[email protected]("/models/{model_id}", summary="Get model information")
+async def get_model_info(model_id: str, simulation_service = 
Depends(get_simulation_service)):
+    """
+    Get detailed information about a specific simulation model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns model metadata and script information.
+    """
+    try:
+        model_info = simulation_service.get_model_info(model_id)
+        
+        return {
+            "status": "success",
+            "model": model_info
+        }
+        
+    except ValueError as e:
+        raise HTTPException(status_code=404, detail=str(e))
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to get model 
info: {str(e)}")
+
+
[email protected]("/models/{model_id}/results", summary="Get simulation results")
+async def get_model_results(
+    model_id: str, 
+    limit: int = 100,
+    offset: int = 0,
+    data_service = Depends(get_data_service)
+):
+    """
+    Get simulation results for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    - **limit**: Maximum number of results to return (default: 100)
+    - **offset**: Number of results to skip (default: 0)
+    
+    Returns paginated simulation results.
+    """
+    try:
+        results_data = data_service.get_simulation_results(
+            model_id=model_id, 
+            limit=limit, 
+            offset=offset
+        )
+        
+        return {
+            "status": "success",
+            **results_data
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to get results: 
{str(e)}")
+
+
[email protected]("/models/{model_id}/results", summary="Clear model results")
+async def clear_model_results(model_id: str, db = Depends(get_database)):
+    """
+    Clear all simulation results for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns confirmation of deletion.
+    """
+    try:
+        # Delete results from database
+        with db.config.get_sqlite_connection() as conn:
+            cursor = conn.execute(
+                "DELETE FROM results WHERE model_id = ?", 
+                (model_id,)
+            )
+            deleted_count = cursor.rowcount
+        
+        return {
+            "status": "success",
+            "message": f"Deleted {deleted_count} results for model {model_id}",
+            "deleted_count": deleted_count
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to clear results: 
{str(e)}")
+
+
[email protected]("/models/{model_id}/script", summary="Get model script")
+async def get_model_script(model_id: str, simulation_service = 
Depends(get_simulation_service)):
+    """
+    Get the refactored script for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns the script content.
+    """
+    try:
+        model_info = simulation_service.get_model_info(model_id)
+        script_path = model_info.get("script_path")
+        
+        # Try to find the script in common locations if script_path is not 
available
+        if not script_path:
+            # Look for script in external_models directory
+            possible_paths = [
+                f"external_models/{model_id}.py",
+                f"external_models/{model_info.get('name', model_id)}.py",
+                f"systems/models/{model_id}.py",
+                f"systems/models/{model_info.get('name', model_id)}.py"
+            ]
+            
+            for path in possible_paths:
+                if Path(path).exists():
+                    script_path = path
+                    break
+        
+        if not script_path or not Path(script_path).exists():
+            raise HTTPException(status_code=404, detail=f"Script not found for 
model {model_id}")
+        
+        # Read the script file
+        with open(script_path, 'r') as f:

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/46)



##########
modules/research-framework/simexr_mod/code/refactor/llm_refactor.py:
##########
@@ -0,0 +1,99 @@
+import re
+import os
+from pathlib import Path
+from typing import Any, Tuple
+
+import openai
+from code.extract.llm_extract import extract_script_settings  # assumes this 
is defined elsewhere
+
+
+def refactor_to_single_entry(
+    script_path: Path,
+    entry_fn: str = "simulate",
+    llm_model: str = "gpt-5-mini",
+    max_attempts: int = 3
+) -> Tuple[Path, Any]:
+    """
+    Refactors a full Python simulation script into a single function 
`simulate(**params)`
+    which overrides all internally defined parameters and returns a dict.
+    Uses an agentic retry loop to recover from malformed generations.
+    """
+    # Ensure OpenAI API key is configured globally
+    try:
+        from utils.openai_config import ensure_openai_api_key
+        ensure_openai_api_key()
+        print("[LLM_REFACTOR] OpenAI API key configured globally")
+    except Exception as e:
+        print(f"[LLM_REFACTOR] Warning: Could not configure OpenAI API key: 
{e}")
+    
+    print(f"[LLM_REFACTOR] Starting refactor_to_single_entry for 
{script_path}")
+    original_source = script_path.read_text().strip()

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/35)



##########
modules/research-framework/simexr_mod/code/refactor/llm_refactor.py:
##########
@@ -0,0 +1,99 @@
+import re
+import os
+from pathlib import Path
+from typing import Any, Tuple
+
+import openai
+from code.extract.llm_extract import extract_script_settings  # assumes this 
is defined elsewhere
+
+
+def refactor_to_single_entry(
+    script_path: Path,
+    entry_fn: str = "simulate",
+    llm_model: str = "gpt-5-mini",
+    max_attempts: int = 3
+) -> Tuple[Path, Any]:
+    """
+    Refactors a full Python simulation script into a single function 
`simulate(**params)`
+    which overrides all internally defined parameters and returns a dict.
+    Uses an agentic retry loop to recover from malformed generations.
+    """
+    # Ensure OpenAI API key is configured globally
+    try:
+        from utils.openai_config import ensure_openai_api_key
+        ensure_openai_api_key()
+        print("[LLM_REFACTOR] OpenAI API key configured globally")
+    except Exception as e:
+        print(f"[LLM_REFACTOR] Warning: Could not configure OpenAI API key: 
{e}")
+    
+    print(f"[LLM_REFACTOR] Starting refactor_to_single_entry for 
{script_path}")
+    original_source = script_path.read_text().strip()
+    print(f"[LLM_REFACTOR] Original source length: {len(original_source)}")
+
+    def build_prompt(source_code: str) -> str:
+        return (
+            f"""
+        You are a helpful **code-refactoring assistant**.
+        
+        Your task: Take the entire Python script below and refactor it into a 
single function:
+        
+            def {entry_fn}(**params):
+        
+        Requirements for the new function:
+        - Inline all helper functions if needed.
+        - Return **one dictionary** of results with Python built-in datatypes.
+        - Override all internally defined constants/globals with values from 
`params` if keys exist.
+        - Contain **no top-level code** and **no extra function definitions**.
+        - Must behave as a self-contained black box that depends *only* on its 
parameters.
+        - Catch common issues like indentation and variable scope errors.
+        - Ensure the data types for all variable are type checked and 
converted incase of unexpected type inputs.
+        
+        If initial condition values are missing from `params`, make an 
intelligent guess.
+        
+        Return ONLY the **Python source code** for the new function (no 
markdown, no explanations).
+        
+        --- Original script ---
+        ```python
+        {source_code}```
+        """)
+
+
+    def is_valid_python(source: str) -> bool:
+        try:
+            compile(source, "<string>", "exec")
+            return True
+        except SyntaxError:
+            return False
+
+    for attempt in range(1, max_attempts + 1):
+        print(f"[LLM_REFACTOR] [Attempt {attempt}] Refactoring script into 
`{entry_fn}(**params)`...")
+
+        prompt = build_prompt(original_source)
+        print(f"[LLM_REFACTOR] Prompt length: {len(prompt)}")
+        
+        print(f"[LLM_REFACTOR] Making OpenAI API call...")
+        resp = openai.chat.completions.create(
+            model=llm_model,
+            messages=[
+                {"role": "system", "content": "You are a code transformation 
assistant."},
+                {"role": "user", "content": prompt},
+            ],
+            # temperature=0.0,
+        )
+        print(f"[LLM_REFACTOR] OpenAI API call completed")
+
+        content = resp.choices[0].message.content.strip()
+
+        # Clean code fences
+        new_src = re.sub(r"^```python\s*", "", content)
+        new_src = re.sub(r"```$", "", new_src).strip()
+
+        if is_valid_python(new_src):
+            script_path.write_text(new_src)

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/36)



##########
modules/research-framework/simexr_mod/code/extract/llm_extract.py:
##########
@@ -0,0 +1,131 @@
+from typing import Any, Dict
+
+from core.parser import tidy_json
+
+from pathlib import Path
+import json, re
+import openai
+import os
+
+def extract_script_settings(
+        script_path: str,
+        llm_model: str = "gpt-5-mini",
+        retries: int = 4
+) -> Dict[str, Any]:
+    """
+    Return a flat settings dict: name -> default (float for 
numerics/fractions; else original).
+    Uses gpt-5-mini by default. Robust to malformed LLM output.
+    """
+    # Ensure OpenAI API key is configured globally
+    try:
+        from utils.openai_config import ensure_openai_api_key
+        ensure_openai_api_key()
+        print("OpenAI API key configured globally in llm_extract")
+    except Exception as e:
+        print(f"Warning: Could not configure OpenAI API key in llm_extract: 
{e}")
+    
+    code = Path(script_path).read_text()

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/37)



##########
modules/research-framework/simexr_mod/api/routers/simulation.py:
##########
@@ -0,0 +1,510 @@
+"""
+Simulation execution API endpoints.
+"""
+
+import time
+from typing import List
+from fastapi import APIRouter, HTTPException, Depends, Request
+from pathlib import Path
+
+from ..models import (
+    SingleSimulationRequest, BatchSimulationRequest, SimulationResult,
+    BatchSimulationResponse, StatusResponse, ErrorResponse
+)
+from ..dependencies import get_simulation_service, get_data_service, 
get_database
+from core.interfaces import SimulationStatus
+
+
+router = APIRouter()
+
+
[email protected]("/import/github", summary="Import simulation from GitHub")
+async def import_from_github(
+    github_url: str,
+    model_name: str,
+    description: str = "",
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Import a simulation model from a GitHub URL.
+    
+    - **github_url**: URL to the GitHub script (e.g., 
https://github.com/user/repo/blob/main/script.py)
+    - **model_name**: Name for the imported model
+    - **description**: Optional description of the model
+    
+    Returns the generated model ID.
+    """
+    try:
+        # Extract parameters info from the script if possible
+        parameters = {
+            "github_url": "Source URL",
+            "imported": "Imported from GitHub"
+        }
+        
+        model_id = simulation_service.import_model_from_github(
+            github_url=github_url,
+            model_name=model_name,
+            description=description,
+            parameters=parameters
+        )
+        
+        return {
+            "status": "success",
+            "model_id": model_id,
+            "message": f"Successfully imported model from {github_url}",
+            "github_url": github_url,
+            "model_name": model_name
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to import from 
GitHub: {str(e)}")
+
+
[email protected]("/transform/github", summary="Transform GitHub script using 
transform_code")
+async def transform_github_script(
+    github_url: str,
+    model_name: str,
+    max_smoke_iters: int = 3
+):
+    """
+    Transform a GitHub script using the transform_code module.
+    
+    This endpoint uses ExternalScriptImporter to:
+    1. Import the script from GitHub
+    2. Refactor it to have a simulate(**params) function
+    3. Refine it through smoke testing and fixes
+    4. Return the model_id and metadata
+    
+    - **github_url**: URL to the GitHub script
+    - **model_name**: Name for the imported model
+    - **max_smoke_iters**: Maximum smoke test iterations (default: 3)
+    
+    Returns the generated model ID and processing details.
+    """
+    try:
+        print(f"[TRANSFORM API] Starting transform process for {github_url}")
+        from execute.loader.transform_code import ExternalScriptImporter
+        import tempfile
+        import os
+        
+        # Create importer
+        print("[TRANSFORM API] Creating ExternalScriptImporter...")
+        importer = ExternalScriptImporter()
+        
+        # Create temporary directory for processing
+        print(f"[TRANSFORM API] Creating temporary directory...")
+        with tempfile.TemporaryDirectory() as temp_dir:
+            print(f"[TRANSFORM API] Temporary directory created: {temp_dir}")
+            # Import and refactor using transform_code
+            print(f"[TRANSFORM API] Calling import_and_refactor...")
+            model_id, metadata = importer.import_and_refactor(
+                source_url=github_url,
+                model_name=model_name,
+                dest_dir=temp_dir,
+                max_smoke_iters=max_smoke_iters
+            )
+            
+            print(f"[TRANSFORM API] Import and refactor completed. Model ID: 
{model_id}")
+            # Get the final script path from the database
+            from db import get_simulation_path
+            try:
+                script_path = get_simulation_path(model_id)
+                print(f"[TRANSFORM API] Script path from database: 
{script_path}")
+            except:
+                # Fallback to expected path
+                script_path = f"external_models/{model_name}.py"
+                print(f"[TRANSFORM API] Using fallback script path: 
{script_path}")
+            
+            # Read the final script content
+            print(f"[TRANSFORM API] Reading script content...")
+            with open(script_path, 'r') as f:
+                script_content = f.read()
+            print(f"[TRANSFORM API] Script content length: 
{len(script_content)}")
+            
+            return {
+                "status": "success",
+                "model_id": model_id,
+                "message": f"Successfully transformed script from 
{github_url}",
+                "github_url": github_url,
+                "model_name": model_name,
+                "script_path": script_path,
+                "script_content": script_content,
+                "metadata": metadata,
+                "processing_details": {
+                    "max_smoke_iters": max_smoke_iters,
+                    "script_size": len(script_content),
+                    "temp_directory": temp_dir
+                }
+            }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to transform 
GitHub script: {str(e)}")
+
+
[email protected]("/run", response_model=SimulationResult, summary="Run single 
simulation")
+async def run_single_simulation(
+    request: SingleSimulationRequest,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Execute a single simulation with given parameters.
+    
+    - **model_id**: ID of the simulation model
+    - **parameters**: Dictionary of simulation parameters
+    
+    Returns the simulation result with outputs and execution metadata.
+    """
+    try:
+        # Use the service layer
+        result = simulation_service.run_single_simulation(
+            model_id=request.model_id,
+            parameters=request.parameters.model_dump()
+        )
+        
+        # Convert to API response format
+        return SimulationResult(
+            success=result.status == SimulationStatus.COMPLETED,
+            parameters=result.parameters,
+            results=result.outputs,
+            execution_time=result.execution_time,
+            stdout=result.stdout,
+            stderr=result.stderr,
+            error_message=result.error_message
+        )
+        
+    except FileNotFoundError:
+        raise HTTPException(status_code=404, detail=f"Model {request.model_id} 
not found")
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Simulation failed: 
{str(e)}")
+
+
[email protected]("/batch", response_model=BatchSimulationResponse, summary="Run 
batch simulations")
+async def run_batch_simulation(
+    request: BatchSimulationRequest,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Execute multiple simulations in batch with different parameter sets.
+    
+    - **model_id**: ID of the simulation model
+    - **parameter_grid**: List of parameter dictionaries
+    
+    Returns batch execution results with statistics.
+    """
+    try:
+        start_time = time.time()
+        
+        # Convert parameter grid
+        param_grid = [params.model_dump() for params in request.parameter_grid]
+        
+        # Use the service layer
+        results = simulation_service.run_batch_simulations(
+            model_id=request.model_id,
+            parameter_grid=param_grid
+        )
+        
+        # Convert to API response format
+        api_results = []
+        for result in results:
+            api_result = SimulationResult(
+                success=result.status == SimulationStatus.COMPLETED,
+                parameters=result.parameters,
+                results=result.outputs,
+                execution_time=result.execution_time,
+                stdout=result.stdout,
+                stderr=result.stderr,
+                error_message=result.error_message
+            )
+            api_results.append(api_result)
+        
+        execution_time = time.time() - start_time
+        successful_runs = sum(1 for r in api_results if r.success)
+        failed_runs = len(api_results) - successful_runs
+        
+        return BatchSimulationResponse(
+            status="completed",
+            total_runs=len(api_results),
+            successful_runs=successful_runs,
+            failed_runs=failed_runs,
+            results=api_results,
+            execution_time=execution_time
+        )
+        
+    except FileNotFoundError:
+        raise HTTPException(status_code=404, detail=f"Model {request.model_id} 
not found")
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Batch simulation failed: 
{str(e)}")
+
+
[email protected]("/models", summary="List available simulation models")
+async def list_models(simulation_service = Depends(get_simulation_service)):
+    """
+    Get a list of all available simulation models.
+    
+    Returns list of models with basic information.
+    """
+    try:
+        models = simulation_service.list_models()
+        return {
+            "status": "success",
+            "count": len(models),
+            "models": models
+        }
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to list models: 
{str(e)}")
+
+
[email protected]("/models/search", summary="Search models by name (fuzzy search)")
+async def search_models_by_name(
+    name: str,
+    limit: int = 20,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Search for simulation models by name using fuzzy matching.
+    
+    - **name**: Partial name to search for (case-insensitive)
+    - **limit**: Maximum number of results to return (default: 20)
+    
+    Returns models that match the search criteria.
+    """
+    try:
+        import re
+        
+        # Get all models
+        all_models = simulation_service.list_models()
+        
+        # Convert search term to lowercase for case-insensitive matching
+        search_term = name.lower()
+        
+        # Filter models using fuzzy matching
+        matching_models = []
+        for model in all_models:
+            model_name = model.get('name', '').lower()
+            model_id = model.get('id', '').lower()
+            
+            # Check if search term appears in model name or ID
+            if (search_term in model_name or 
+                search_term in model_id or
+                any(word in model_name for word in search_term.split()) or
+                any(word in model_id for word in search_term.split())):
+                matching_models.append(model)
+        
+        # Sort by relevance (exact matches first, then partial matches)
+        def relevance_score(model):
+            model_name = model.get('name', '').lower()
+            model_id = model.get('id', '').lower()
+            
+            # Exact match gets highest score
+            if search_term == model_name or search_term == model_id:
+                return 100
+            # Starts with search term
+            elif model_name.startswith(search_term) or 
model_id.startswith(search_term):
+                return 90
+            # Contains search term
+            elif search_term in model_name or search_term in model_id:
+                return 80
+            # Word boundary matches
+            elif any(word in model_name for word in search_term.split()):
+                return 70
+            else:
+                return 50
+        
+        # Sort by relevance and limit results
+        matching_models.sort(key=relevance_score, reverse=True)
+        limited_models = matching_models[:limit]
+        
+        return {
+            "status": "success",
+            "search_term": name,
+            "total_matches": len(matching_models),
+            "returned_count": len(limited_models),
+            "limit": limit,
+            "models": limited_models
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to search models: 
{str(e)}")
+
+
[email protected]("/models/{model_id}", summary="Get model information")
+async def get_model_info(model_id: str, simulation_service = 
Depends(get_simulation_service)):
+    """
+    Get detailed information about a specific simulation model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns model metadata and script information.
+    """
+    try:
+        model_info = simulation_service.get_model_info(model_id)
+        
+        return {
+            "status": "success",
+            "model": model_info
+        }
+        
+    except ValueError as e:
+        raise HTTPException(status_code=404, detail=str(e))
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to get model 
info: {str(e)}")
+
+
[email protected]("/models/{model_id}/results", summary="Get simulation results")
+async def get_model_results(
+    model_id: str, 
+    limit: int = 100,
+    offset: int = 0,
+    data_service = Depends(get_data_service)
+):
+    """
+    Get simulation results for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    - **limit**: Maximum number of results to return (default: 100)
+    - **offset**: Number of results to skip (default: 0)
+    
+    Returns paginated simulation results.
+    """
+    try:
+        results_data = data_service.get_simulation_results(
+            model_id=model_id, 
+            limit=limit, 
+            offset=offset
+        )
+        
+        return {
+            "status": "success",
+            **results_data
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to get results: 
{str(e)}")
+
+
[email protected]("/models/{model_id}/results", summary="Clear model results")
+async def clear_model_results(model_id: str, db = Depends(get_database)):
+    """
+    Clear all simulation results for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns confirmation of deletion.
+    """
+    try:
+        # Delete results from database
+        with db.config.get_sqlite_connection() as conn:
+            cursor = conn.execute(
+                "DELETE FROM results WHERE model_id = ?", 
+                (model_id,)
+            )
+            deleted_count = cursor.rowcount
+        
+        return {
+            "status": "success",
+            "message": f"Deleted {deleted_count} results for model {model_id}",
+            "deleted_count": deleted_count
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to clear results: 
{str(e)}")
+
+
[email protected]("/models/{model_id}/script", summary="Get model script")
+async def get_model_script(model_id: str, simulation_service = 
Depends(get_simulation_service)):
+    """
+    Get the refactored script for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    
+    Returns the script content.
+    """
+    try:
+        model_info = simulation_service.get_model_info(model_id)
+        script_path = model_info.get("script_path")
+        
+        # Try to find the script in common locations if script_path is not 
available
+        if not script_path:
+            # Look for script in external_models directory
+            possible_paths = [
+                f"external_models/{model_id}.py",
+                f"external_models/{model_info.get('name', model_id)}.py",
+                f"systems/models/{model_id}.py",
+                f"systems/models/{model_info.get('name', model_id)}.py"
+            ]
+            
+            for path in possible_paths:
+                if Path(path).exists():
+                    script_path = path
+                    break
+        
+        if not script_path or not Path(script_path).exists():
+            raise HTTPException(status_code=404, detail=f"Script not found for 
model {model_id}")
+        
+        # Read the script file
+        with open(script_path, 'r') as f:
+            script_content = f.read()
+        
+        return {
+            "status": "success",
+            "model_id": model_id,
+            "script": script_content,
+            "script_path": script_path,
+            "is_placeholder": False
+        }
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Failed to get script: 
{str(e)}")
+
+
[email protected]("/models/{model_id}/script", summary="Save model script")
+async def save_model_script(
+    model_id: str, 
+    script_data: dict,
+    simulation_service = Depends(get_simulation_service)
+):
+    """
+    Save the modified script for a specific model.
+    
+    - **model_id**: ID of the simulation model
+    - **script_data**: Dictionary containing the script content
+    
+    Returns confirmation of save.
+    """
+    try:
+        model_info = simulation_service.get_model_info(model_id)
+        script_path = model_info.get("script_path")
+        
+        # If no script path exists, create one in external_models directory
+        if not script_path:
+            script_path = f"external_models/{model_id}.py"
+            # Ensure the directory exists
+            Path("external_models").mkdir(exist_ok=True)
+        
+        script_content = script_data.get("script")
+        if not script_content:
+            raise HTTPException(status_code=400, detail="Script content is 
required")
+        
+        # Write the script to file
+        with open(script_path, 'w') as f:

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/47)



##########
modules/research-framework/simexr_mod/code/utils/github_utils.py:
##########
@@ -0,0 +1,26 @@
+import requests
+from pathlib import Path
+
+def fetch_notebook_from_github(github_url: str, dest_dir: str = 
"external_models") -> str:
+    """
+    Downloads a file from a GitHub URL and saves it locally.
+    Handles both raw URLs and blob URLs.
+    Returns the local path to the saved file.
+    """
+    # Convert GitHub blob URL to raw URL if needed
+    if "github.com" in github_url and "/blob/" in github_url:
+        raw_url = github_url.replace("github.com", 
"raw.githubusercontent.com").replace("/blob/", "/")
+    else:
+        raw_url = github_url
+    
+    print(f"[GITHUB_UTILS] Converting {github_url} to {raw_url}")
+    
+    resp = requests.get(raw_url)
+    resp.raise_for_status()
+
+    Path(dest_dir).mkdir(exist_ok=True, parents=True)
+    filename = Path(raw_url).name
+    local_path = Path(dest_dir) / filename
+    local_path.write_bytes(resp.content)

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/34)



##########
modules/research-framework/simexr_mod/code/utils/notebook_utils.py:
##########
@@ -0,0 +1,36 @@
+import nbformat
+from nbconvert import PythonExporter
+from pathlib import Path
+import shutil
+
+def notebook_to_script(notebook_path: str, output_dir: str = 
"external_models") -> str:
+    """
+    If `notebook_path` is a Jupyter notebook (.ipynb), convert it to a .py 
script
+    in `output_dir`, returning the script path.
+    If it's already a .py file, ensure it's in `output_dir` (copy if needed)
+    and return its path.
+    """
+    src = Path(notebook_path)
+    out_dir = Path(output_dir)
+    out_dir.mkdir(parents=True, exist_ok=True)
+
+    # Case 1: Already a Python script
+    if src.suffix.lower() == ".py":
+        dest = out_dir / src.name
+        # copy only if not already in the target dir
+        if src.resolve() != dest.resolve():
+            shutil.copy2(src, dest)

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/41)



##########
modules/research-framework/simexr_mod/code/utils/notebook_utils.py:
##########
@@ -0,0 +1,36 @@
+import nbformat
+from nbconvert import PythonExporter
+from pathlib import Path
+import shutil
+
+def notebook_to_script(notebook_path: str, output_dir: str = 
"external_models") -> str:
+    """
+    If `notebook_path` is a Jupyter notebook (.ipynb), convert it to a .py 
script
+    in `output_dir`, returning the script path.
+    If it's already a .py file, ensure it's in `output_dir` (copy if needed)
+    and return its path.
+    """
+    src = Path(notebook_path)
+    out_dir = Path(output_dir)
+    out_dir.mkdir(parents=True, exist_ok=True)
+
+    # Case 1: Already a Python script
+    if src.suffix.lower() == ".py":
+        dest = out_dir / src.name
+        # copy only if not already in the target dir
+        if src.resolve() != dest.resolve():

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/38)



##########
modules/research-framework/simexr_mod/core/patterns.py:
##########
@@ -0,0 +1,686 @@
+"""
+Implementation of key design patterns for SimExR.
+
+This module provides concrete implementations of various design patterns
+to improve code organization and maintainability.
+"""
+
+import threading
+import weakref
+import logging
+from abc import ABC, abstractmethod
+from typing import Dict, Any, List, Optional, Type, Callable, Union
+from pathlib import Path
+from dataclasses import dataclass, field
+from enum import Enum
+import time
+import uuid
+
+from .interfaces import (
+    ISimulationRunner, ISimulationLoader, IResultStore, IReasoningAgent,
+    IEventListener, IExecutionStrategy, IModelAdapter, IResourceManager,
+    SimulationRequest, SimulationResult, SimulationStatus
+)
+
+
+# ===== FACTORY PATTERN =====
+
+class ComponentType(Enum):
+    """Types of components that can be created by the factory."""
+    SIMULATION_RUNNER = "simulation_runner"
+    RESULT_STORE = "result_store"
+    REASONING_AGENT = "reasoning_agent"
+    MODEL_LOADER = "model_loader"
+    EXECUTION_STRATEGY = "execution_strategy"
+
+
+class SimulationFactory:
+    """Factory for creating simulation-related components."""
+    
+    def __init__(self):
+        self._creators: Dict[ComponentType, Callable] = {}
+        self._instances: Dict[str, Any] = {}
+    
+    def register_creator(self, component_type: ComponentType, creator: 
Callable):
+        """Register a creator function for a component type."""
+        self._creators[component_type] = creator
+    
+    def create(self, component_type: ComponentType, **kwargs) -> Any:
+        """Create a component of the specified type."""
+        if component_type not in self._creators:
+            raise ValueError(f"No creator registered for {component_type}")
+        
+        creator = self._creators[component_type]
+        return creator(**kwargs)
+    
+    def create_singleton(self, component_type: ComponentType, instance_id: 
str, **kwargs) -> Any:
+        """Create or retrieve a singleton instance."""
+        if instance_id in self._instances:
+            return self._instances[instance_id]
+        
+        instance = self.create(component_type, **kwargs)
+        self._instances[instance_id] = instance
+        return instance
+    
+    def get_registered_types(self) -> List[ComponentType]:
+        """Get list of registered component types."""
+        return list(self._creators.keys())
+
+
+# ===== STRATEGY PATTERN =====
+
+class LocalExecutionStrategy:
+    """Strategy for executing simulations locally."""
+    
+    def __init__(self, timeout: float = 30.0):
+        self.timeout = timeout
+        self.logger = logging.getLogger("LocalExecutionStrategy")
+        self.logger.setLevel(logging.INFO)
+    
+    def execute(self, request: SimulationRequest) -> SimulationResult:
+        """Execute simulation locally."""
+        self.logger.info(f"[LOCAL_EXECUTION] Starting local execution for 
model {request.model_id}")
+        self.logger.info(f"[LOCAL_EXECUTION] Parameters: {request.parameters}")
+        self.logger.info(f"[LOCAL_EXECUTION] Timeout: {self.timeout}s")
+        
+        start_time = time.time()
+        
+        try:
+            # Import here to avoid circular dependencies
+            from execute.run.simulation_runner import SimulationRunner
+            from db import get_simulation_path
+            
+            self.logger.info(f"[LOCAL_EXECUTION] Getting simulation path for 
model {request.model_id}")
+            script_path = Path(get_simulation_path(request.model_id))
+            self.logger.info(f"[LOCAL_EXECUTION] Script path: {script_path}")
+            
+            self.logger.info(f"[LOCAL_EXECUTION] Creating SimulationRunner")
+            runner = SimulationRunner()
+            
+            self.logger.info(f"[LOCAL_EXECUTION] Running simulation with 
runner")
+            result = runner.run(script_path, request.parameters)
+            
+            execution_time = time.time() - start_time
+            self.logger.info(f"[LOCAL_EXECUTION] Simulation completed in 
{execution_time:.3f}s")
+            
+            success = result.get("_ok", False)
+            self.logger.info(f"[LOCAL_EXECUTION] Success status: {success}")
+            
+            # Log result preview
+            if success:
+                self.logger.info(f"[LOCAL_EXECUTION] Creating successful 
SimulationResult")
+                self._log_final_result_preview(result)
+            else:
+                self.logger.warning(f"[LOCAL_EXECUTION] Creating failed 
SimulationResult")
+            
+            return SimulationResult(
+                status=SimulationStatus.COMPLETED if success else 
SimulationStatus.FAILED,
+                parameters=request.parameters,
+                outputs={k: v for k, v in result.items() if not 
k.startswith("_")},
+                execution_time=execution_time,
+                stdout=result.get("_stdout", ""),
+                stderr=result.get("_stderr", ""),
+                error_message=result.get("_error_msg") if not success else None
+            )
+            
+        except Exception as e:
+            execution_time = time.time() - start_time
+            self.logger.error(f"[LOCAL_EXECUTION] Execution failed after 
{execution_time:.3f}s: {str(e)}")
+            self.logger.error(f"[LOCAL_EXECUTION] Error type: 
{type(e).__name__}")
+            
+            return SimulationResult(
+                status=SimulationStatus.FAILED,
+                parameters=request.parameters,
+                outputs={},
+                execution_time=execution_time,
+                error_message=str(e)
+            )
+    
+    def can_handle(self, request: SimulationRequest) -> bool:
+        """Check if this strategy can handle the request."""
+        return True  # Local execution can handle any request
+    
+    def get_priority(self) -> int:
+        """Get priority (lower = higher priority)."""
+        return 10
+    
+    def _log_final_result_preview(self, result: Dict[str, Any]) -> None:
+        """Log a preview of the final simulation results."""
+        self.logger.info(f"[LOCAL_EXECUTION] === FINAL RESULT SUMMARY ===")
+        
+        # Show key metrics
+        if 'success' in result:
+            self.logger.info(f"[LOCAL_EXECUTION] Success: {result['success']}")
+        
+        if 'solver_message' in result:
+            self.logger.info(f"[LOCAL_EXECUTION] Solver: 
{result['solver_message']}")
+        
+        # Show data sizes
+        for key in ['t', 'x', 'y']:
+            if key in result and isinstance(result[key], (list, tuple)):
+                self.logger.info(f"[LOCAL_EXECUTION] {key.upper()} data 
points: {len(result[key])}")
+        
+        # Show grid info if available
+        for key in ['x_grid', 'y_grid', 'u_grid', 'v_grid']:
+            if key in result and isinstance(result[key], (list, tuple)):
+                if len(result[key]) > 0 and isinstance(result[key][0], (list, 
tuple)):
+                    self.logger.info(f"[LOCAL_EXECUTION] {key.upper()} grid: 
{len(result[key])}x{len(result[key][0])}")
+        
+        # Show key parameters
+        for key in ['mu', 'z0', 'eval_time', 't_iteration', 'grid_points', 
'mgrid_size']:
+            if key in result:
+                self.logger.info(f"[LOCAL_EXECUTION] {key}: {result[key]}")
+        
+        self.logger.info(f"[LOCAL_EXECUTION] === END FINAL RESULT SUMMARY ===")
+
+
+class RemoteExecutionStrategy:
+    """Strategy for executing simulations remotely (placeholder)."""
+    
+    def __init__(self, endpoint: str):
+        self.endpoint = endpoint
+    
+    def execute(self, request: SimulationRequest) -> SimulationResult:
+        """Execute simulation remotely."""
+        # Placeholder implementation
+        raise NotImplementedError("Remote execution not yet implemented")
+    
+    def can_handle(self, request: SimulationRequest) -> bool:
+        """Check if this strategy can handle the request."""
+        return False  # Not implemented yet
+    
+    def get_priority(self) -> int:
+        """Get priority."""
+        return 5  # Higher priority than local if available
+
+
+class ExecutionStrategyManager:
+    """Manages different execution strategies."""
+    
+    def __init__(self):
+        self.strategies: List[IExecutionStrategy] = []
+    
+    def add_strategy(self, strategy: IExecutionStrategy):
+        """Add an execution strategy."""
+        self.strategies.append(strategy)
+        # Sort by priority (lower number = higher priority)
+        self.strategies.sort(key=lambda s: s.get_priority())
+    
+    def execute(self, request: SimulationRequest) -> SimulationResult:
+        """Execute using the best available strategy."""
+        for strategy in self.strategies:
+            if strategy.can_handle(request):
+                return strategy.execute(request)
+        
+        raise RuntimeError("No execution strategy available for this request")
+
+
+# ===== OBSERVER PATTERN =====
+
+class SimulationEvent:
+    """Event data for simulation notifications."""
+    
+    def __init__(self, event_type: str, data: Dict[str, Any]):
+        self.event_type = event_type
+        self.data = data
+        self.timestamp = time.time()
+
+
+class SimulationSubject:
+    """Subject that notifies observers of simulation events."""
+    
+    def __init__(self):
+        self._observers: List[IEventListener] = []
+    
+    def attach(self, observer: IEventListener):
+        """Attach an observer."""
+        if observer not in self._observers:
+            self._observers.append(observer)
+    
+    def detach(self, observer: IEventListener):
+        """Detach an observer."""
+        if observer in self._observers:
+            self._observers.remove(observer)
+    
+    def notify_started(self, request: SimulationRequest):
+        """Notify all observers that a simulation started."""
+        for observer in self._observers:
+            try:
+                observer.on_simulation_started(request)
+            except Exception as e:
+                print(f"Observer notification failed: {e}")
+    
+    def notify_completed(self, result: SimulationResult):
+        """Notify all observers that a simulation completed."""
+        for observer in self._observers:
+            try:
+                observer.on_simulation_completed(result)
+            except Exception as e:
+                print(f"Observer notification failed: {e}")
+    
+    def notify_failed(self, request: SimulationRequest, error: Exception):
+        """Notify all observers that a simulation failed."""
+        for observer in self._observers:
+            try:
+                observer.on_simulation_failed(request, error)
+            except Exception as e:
+                print(f"Observer notification failed: {e}")
+
+
+class LoggingObserver:
+    """Observer that logs simulation events."""
+    
+    def __init__(self, log_file: Optional[Path] = None):
+        self.log_file = log_file
+    
+    def on_simulation_started(self, request: SimulationRequest):
+        """Log simulation start."""
+        message = f"Simulation started: {request.model_id} with params 
{request.parameters}"
+        self._log(message)
+    
+    def on_simulation_completed(self, result: SimulationResult):
+        """Log simulation completion."""
+        message = f"Simulation completed: {result.status.value} in 
{result.execution_time:.2f}s"
+        self._log(message)
+    
+    def on_simulation_failed(self, request: SimulationRequest, error: 
Exception):
+        """Log simulation failure."""
+        message = f"Simulation failed: {request.model_id} - {str(error)}"
+        self._log(message)
+    
+    def _log(self, message: str):
+        """Write log message."""
+        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
+        full_message = f"[{timestamp}] {message}"
+        
+        if self.log_file:
+            with open(self.log_file, 'a') as f:
+                f.write(full_message + "\n")
+        else:
+            print(full_message)
+
+
+# ===== COMMAND PATTERN =====
+
+class Command(ABC):
+    """Abstract command interface."""
+    
+    @abstractmethod
+    def execute(self) -> Any:
+        """Execute the command."""
+        pass
+    
+    @abstractmethod
+    def undo(self) -> Any:
+        """Undo the command."""
+        pass
+
+
+class RunSimulationCommand(Command):
+    """Command to run a simulation."""
+    
+    def __init__(self, runner: ISimulationRunner, request: SimulationRequest):
+        self.runner = runner
+        self.request = request
+        self.result: Optional[SimulationResult] = None
+    
+    def execute(self) -> SimulationResult:
+        """Execute the simulation."""
+        self.result = self.runner.run(self.request)
+        return self.result
+    
+    def undo(self) -> None:
+        """Undo not applicable for simulation execution."""
+        pass
+
+
+class StoreModelCommand(Command):
+    """Command to store a simulation model."""
+    
+    def __init__(self, model_name: str, metadata: Dict[str, Any], 
script_content: str):
+        self.model_name = model_name
+        self.metadata = metadata
+        self.script_content = script_content
+        self.model_id: Optional[str] = None
+    
+    def execute(self) -> str:
+        """Store the model."""
+        from db import store_simulation_script
+        import tempfile
+        
+        # Create temporary script file
+        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) 
as f:
+            f.write(self.script_content)
+            temp_path = f.name
+        
+        try:
+            self.model_id = store_simulation_script(
+                model_name=self.model_name,
+                metadata=self.metadata,
+                script_path=temp_path
+            )
+            return self.model_id
+        finally:
+            Path(temp_path).unlink(missing_ok=True)
+    
+    def undo(self) -> None:
+        """Delete the stored model."""
+        if self.model_id:
+            # Implementation would delete the model from database
+            pass
+
+
+class CommandInvoker:
+    """Invoker that executes commands and maintains history."""
+    
+    def __init__(self):
+        self.history: List[Command] = []
+    
+    def execute_command(self, command: Command) -> Any:
+        """Execute a command and add to history."""
+        result = command.execute()
+        self.history.append(command)
+        return result
+    
+    def undo_last(self) -> None:
+        """Undo the last command."""
+        if self.history:
+            command = self.history.pop()
+            command.undo()
+
+
+# ===== BUILDER PATTERN =====
+
+class SimulationConfigBuilder:
+    """Builder for creating complex simulation configurations."""
+    
+    def __init__(self):
+        self.reset()
+    
+    def reset(self):
+        """Reset the builder state."""
+        self._config = {
+            'model_id': None,
+            'parameters': {},
+            'execution_options': {},
+            'validation_rules': [],
+            'observers': [],
+            'strategies': []
+        }
+        return self
+    
+    def set_model(self, model_id: str):
+        """Set the simulation model."""
+        self._config['model_id'] = model_id
+        return self
+    
+    def add_parameter(self, name: str, value: Any):
+        """Add a simulation parameter."""
+        self._config['parameters'][name] = value
+        return self
+    
+    def add_parameters(self, parameters: Dict[str, Any]):
+        """Add multiple simulation parameters."""
+        self._config['parameters'].update(parameters)
+        return self
+    
+    def set_execution_option(self, name: str, value: Any):
+        """Set an execution option."""
+        self._config['execution_options'][name] = value
+        return self
+    
+    def set_timeout(self, timeout: float):
+        """Set execution timeout."""
+        self._config['execution_options']['timeout'] = timeout
+        return self
+    
+    def set_priority(self, priority: int):
+        """Set execution priority."""
+        self._config['execution_options']['priority'] = priority
+        return self
+    
+    def add_observer(self, observer: IEventListener):
+        """Add an event observer."""
+        self._config['observers'].append(observer)
+        return self
+    
+    def add_strategy(self, strategy: IExecutionStrategy):
+        """Add an execution strategy."""
+        self._config['strategies'].append(strategy)
+        return self
+    
+    def build(self) -> Dict[str, Any]:
+        """Build the final configuration."""
+        if not self._config['model_id']:
+            raise ValueError("Model ID is required")
+        
+        config = self._config.copy()
+        self.reset()
+        return config
+
+
+# ===== SINGLETON PATTERN =====
+
+class SingletonMeta(type):
+    """Metaclass for creating singleton instances."""
+    
+    _instances = {}
+    _lock = threading.Lock()
+    
+    def __call__(cls, *args, **kwargs):
+        if cls not in cls._instances:
+            with cls._lock:
+                if cls not in cls._instances:
+                    cls._instances[cls] = super().__call__(*args, **kwargs)
+        return cls._instances[cls]
+
+
+class ResourceManager(metaclass=SingletonMeta):
+    """Singleton resource manager for shared resources."""
+    
+    def __init__(self):
+        if hasattr(self, '_initialized'):
+            return
+        
+        self._resources: Dict[str, Any] = {}
+        self._locks: Dict[str, threading.Lock] = {}
+        self._initialized = True
+    
+    def get_resource(self, resource_id: str, factory: Callable = None) -> Any:
+        """Get or create a resource."""
+        if resource_id not in self._resources:
+            if factory is None:
+                raise ValueError(f"Resource {resource_id} not found and no 
factory provided")
+            
+            if resource_id not in self._locks:
+                self._locks[resource_id] = threading.Lock()
+            
+            with self._locks[resource_id]:
+                if resource_id not in self._resources:
+                    self._resources[resource_id] = factory()
+        
+        return self._resources[resource_id]
+    
+    def set_resource(self, resource_id: str, resource: Any):
+        """Set a resource."""
+        self._resources[resource_id] = resource
+    
+    def release_resource(self, resource_id: str):
+        """Release a resource."""
+        if resource_id in self._resources:
+            resource = self._resources.pop(resource_id)
+            if hasattr(resource, 'cleanup'):
+                resource.cleanup()
+    
+    def cleanup_all(self):
+        """Clean up all resources."""
+        for resource_id in list(self._resources.keys()):
+            self.release_resource(resource_id)
+
+
+# ===== ADAPTER PATTERN =====
+
+class GitHubScriptAdapter:
+    """Adapter for importing scripts from GitHub."""
+    
+    def __init__(self):
+        self.supported_formats = ["github_url", "raw_github_url"]
+    
+    def can_adapt(self, source_format: str, target_format: str) -> bool:
+        """Check if adapter can handle the conversion."""
+        return (source_format in self.supported_formats and 
+                target_format == "simexr_script")
+    
+    def adapt(self, source: str, source_format: str, target_format: str) -> 
str:
+        """Convert GitHub URL to SimExR script format."""
+        if not self.can_adapt(source_format, target_format):
+            raise ValueError(f"Cannot adapt from {source_format} to 
{target_format}")
+        
+        if source_format == "github_url":
+            # Convert GitHub URL to raw URL
+            raw_url = self._github_url_to_raw(source)
+        else:
+            raw_url = source
+        
+        # Download the script content
+        import requests
+        response = requests.get(raw_url)
+        response.raise_for_status()
+        
+        script_content = response.text
+        
+        # Adapt to SimExR format (ensure it has a simulate function)
+        if "def simulate(" not in script_content:
+            script_content = self._wrap_as_simulate_function(script_content)
+        
+        return script_content
+    
+    def _github_url_to_raw(self, github_url: str) -> str:
+        """Convert GitHub URL to raw content URL."""
+        if "github.com" in github_url and "/blob/" in github_url:

Review Comment:
   ## Incomplete URL substring sanitization
   
   The string [github.com](1) may be at an arbitrary position in the sanitized 
URL.
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/33)



##########
modules/research-framework/simexr_mod/utils/openai_config.py:
##########
@@ -0,0 +1,50 @@
+#!/usr/bin/env python3
+"""
+OpenAI API Key Configuration Manager
+
+This module ensures the OpenAI API key is properly set and available 
throughout the application.
+"""
+
+import os
+import openai
+from .config import settings
+
+def ensure_openai_api_key():
+    """
+    Ensure the OpenAI API key is set globally.
+    This function should be called at startup and whenever the API key needs 
to be refreshed.
+    """
+    # Get API key from config
+    api_key = settings.openai_api_key
+    
+    if not api_key:
+        raise ValueError("No OpenAI API key found in configuration")
+    
+    # Set in environment
+    os.environ["OPENAI_API_KEY"] = api_key
+    
+    # Set in openai module
+    openai.api_key = api_key
+    
+    print(f"🔑 OpenAI API key configured globally: {api_key[:20]}...")

Review Comment:
   ## Clear-text logging of sensitive information
   
   This expression logs [sensitive data (password)](1) as clear text.
   
   [Show more 
details](https://github.com/apache/airavata/security/code-scanning/31)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to