This is an automated email from the ASF dual-hosted git repository.

DImuthuUpe pushed a commit to branch usage-monitor
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git


The following commit(s) were added to refs/heads/usage-monitor by this push:
     new 808bd69f0 Adding slurm monitor and fetching job list api
808bd69f0 is described below

commit 808bd69f0af45c4859f7952459da797adc4f21cd
Author: DImuthuUpe <[email protected]>
AuthorDate: Tue May 26 10:03:48 2026 -0400

    Adding slurm monitor and fetching job list api
---
 .../SLURM/Association-Mapper/pkg/smapper/loader.go |  2 +-
 connectors/SLURM/Rest-Client/pkg/client/client.go  |  3 +
 connectors/SLURM/Rest-Client/pkg/client/jobs.go    | 58 +++++++++++++++
 .../pkg/client/jobs_integration_test.go            | 82 ++++++++++++++++++++++
 connectors/SLURM/Rest-Client/pkg/client/types.go   | 58 +++++++++++++++
 .../Usage-Monitor/internal/smonitor/smonitor.go    | 25 +++++++
 .../pkg/monitor}/loader.go                         | 18 +++--
 internal/connectors/loader.go                      |  7 ++
 8 files changed, 242 insertions(+), 11 deletions(-)

diff --git a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go 
b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
index 9eee18aef..66aba8e6a 100644
--- a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
+++ b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
@@ -8,8 +8,8 @@ import (
 
        "github.com/jmoiron/sqlx"
 
-       
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
        
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/subscribers"
+       
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
        "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/service"
 )
diff --git a/connectors/SLURM/Rest-Client/pkg/client/client.go 
b/connectors/SLURM/Rest-Client/pkg/client/client.go
index 6886db8ab..85eacf017 100644
--- a/connectors/SLURM/Rest-Client/pkg/client/client.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/client.go
@@ -6,6 +6,7 @@ import (
        "encoding/json"
        "fmt"
        "io"
+       "log"
        "net/http"
        "strings"
        "time"
@@ -37,6 +38,8 @@ func (c *Client) do(method, path string, body any, out any) 
(*http.Response, err
                        return nil, err
                }
                reqBody = bytes.NewReader(buf)
+
+               log.Printf("Request body: %s", string(buf))
        }
        req, err := http.NewRequest(method, c.baseURL+path, reqBody)
        if err != nil {
diff --git a/connectors/SLURM/Rest-Client/pkg/client/jobs.go 
b/connectors/SLURM/Rest-Client/pkg/client/jobs.go
new file mode 100644
index 000000000..270b026e3
--- /dev/null
+++ b/connectors/SLURM/Rest-Client/pkg/client/jobs.go
@@ -0,0 +1,58 @@
+package client
+
+/*
+curl -s -X GET \
+  "http://localhost:6820/slurmdb/v0.0.41/jobs"; \
+  -H "X-SLURM-USER-NAME: root" \
+  -H "X-SLURM-USER-TOKEN: $SLURM_JWT" \
+  -H "Content-Type: application/json" \
+  -d '{
+    "users": ["root"],
+    "start_time": {
+      "set": true,
+      "infinite": false,
+      "number": 1746057600
+    }
+  }'
+*/
+
+import (
+       "fmt"
+)
+
+type jobsResponse struct {
+       Jobs []JobInfo `json:"jobs"`
+}
+
+type JobFilter struct {
+       Users     []string     `json:"users,omitempty"`
+       StartTime *slurmNumber `json:"start_time,omitempty"`
+       EndTime   *slurmNumber `json:"end_time,omitempty"`
+}
+
+func (c *Client) ListJobs(filter JobFilter) ([]JobInfo, error) {
+       var out jobsResponse
+       if _, err := c.do("GET", "/slurmdb/v0.0."+c.apiVersion+"/jobs", filter, 
&out); err != nil {
+               return nil, err
+       }
+       return out.Jobs, nil
+}
+
+func (c *Client) GetJob(id int64) (*JobInfo, error) {
+       var out jobsResponse
+       if _, err := c.do("GET", fmt.Sprintf("/slurmdb/v0.0.%s/job/%d", 
c.apiVersion, id), nil, &out); err != nil {
+               return nil, err
+       }
+       if len(out.Jobs) == 0 {
+               return nil, fmt.Errorf("job %d not found", id)
+       }
+       return &out.Jobs[0], nil
+}
+
+func (c *Client) SubmitJob(request JobSubmitRequest) (*JobSubmitResponse, 
error) {
+       var out JobSubmitResponse
+       if _, err := c.do("POST", fmt.Sprintf("/slurm/v0.0.%s/job/submit", 
c.apiVersion), request, &out); err != nil {
+               return nil, err
+       }
+       return &out, nil
+}
diff --git a/connectors/SLURM/Rest-Client/pkg/client/jobs_integration_test.go 
b/connectors/SLURM/Rest-Client/pkg/client/jobs_integration_test.go
new file mode 100644
index 000000000..8b68950ea
--- /dev/null
+++ b/connectors/SLURM/Rest-Client/pkg/client/jobs_integration_test.go
@@ -0,0 +1,82 @@
+package client
+
+import (
+       "log"
+       "os"
+       "testing"
+       "time"
+)
+
+func TestListJobs(t *testing.T) {
+       if !IsLocalSlurmConfigAvailable() {
+               t.Skip("Skipping integration test for listing jobs because 
local SLURM config is not available")
+       }
+
+       apiUrl := "http://localhost:6820";
+       user := os.Getenv("TEST_SLURM_USER")
+       token := os.Getenv("TEST_SLURM_TOKEN")
+       apiVersion := os.Getenv("TEST_SLURM_API_VERSION")
+
+       client := New(apiUrl, user, token, apiVersion)
+
+       JobSubmitRequest := JobSubmitRequest{
+               JobSubmitParam: JobSubmitParam{
+                       Name:        "test-job",
+                       Account:     "root",
+                       Partition:   "compute",
+                       Tasks:       1,
+                       CpusPerTask: 1,
+                       TimeLimit: slurmNumber{
+                               Set:      true,
+                               Infinite: false,
+                               Number:   20, // 10 seconds
+                       },
+                       CurrentWorkingDir: "/home/testuser",
+                       Environment: []string{
+                               "TEST_ENV_VAR=test_value",
+                       },
+                       // You can set other job parameters here if needed
+               },
+               Script: "#!/bin/bash\nsleep 1", // Simple script that sleeps 
for 1 second
+       }
+
+       currentTime := time.Now().Unix()
+       totalJobsToSubmit := 12
+       for i := 0; i < totalJobsToSubmit; i++ {
+               resp, err := client.SubmitJob(JobSubmitRequest)
+               if err != nil {
+                       t.Fatalf("Failed to submit job: %v", err)
+               }
+
+               if resp.JobID == 0 {
+                       t.Fatalf("Invalid job ID returned from job submission: 
%d", resp.JobID)
+               }
+
+               log.Printf("Submitted job with ID: %d", resp.JobID)
+       }
+
+       sleepDuration := 20 // seconds
+       log.Printf("Sleeping for %d seconds to allow job to start...", 
sleepDuration)
+       time.Sleep(time.Duration(sleepDuration) * time.Second)
+
+       filter := JobFilter{
+               // You can set filter parameters here if needed
+               Users:     []string{"root"},
+               StartTime: &slurmNumber{Set: true, Infinite: false, Number: 
currentTime},
+       }
+
+       jobs, err := client.ListJobs(filter)
+       if err != nil {
+               t.Fatalf("Failed to list jobs: %v", err)
+       }
+       if len(jobs) == 0 {
+               t.Log("No jobs found")
+       } else {
+               t.Logf("Found %d jobs", len(jobs))
+               if len(jobs) != totalJobsToSubmit {
+                       t.Logf("Expected at %d jobs, but found %d", 
totalJobsToSubmit, len(jobs))
+               } else {
+                       t.Log("Successfully found all submitted jobs")
+               }
+       }
+}
diff --git a/connectors/SLURM/Rest-Client/pkg/client/types.go 
b/connectors/SLURM/Rest-Client/pkg/client/types.go
index 3863b77a7..9909c9887 100644
--- a/connectors/SLURM/Rest-Client/pkg/client/types.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/types.go
@@ -184,3 +184,61 @@ func (a *Association) UnmarshalJSON(data []byte) error {
        }
        return nil
 }
+
+type JobTime struct {
+       Elapsed    int64 `json:"elapsed"`
+       Eligible   int64 `json:"eligible"`
+       End        int64 `json:"end"`
+       Start      int64 `json:"start"`
+       Submission int64 `json:"submission"`
+       Suspended  int64 `json:"suspended"`
+}
+
+type JobTresInfo struct {
+       Allocated []TRES `json:"allocated,omitempty"`
+       Requested []TRES `json:"requested,omitempty"`
+}
+
+type JobExitInfo struct {
+       Status     []string    `json:"status"`
+       ReturnCode slurmNumber `json:"return_code"`
+}
+
+type JobInfo struct {
+       Account         string      `json:"account"`
+       Cluster         string      `json:"cluster"`
+       Time            JobTime     `json:"time"`
+       JobID           int64       `json:"job_id"`
+       Name            string      `json:"name"`
+       Partition       string      `json:"partition"`
+       QoS             string      `json:"qos"`
+       User            string      `json:"user"`
+       Nodes           string      `json:"nodes"`
+       Tres            JobTresInfo `json:"tres"`
+       ExitCode        JobExitInfo `json:"exit_code"`
+       DerivedExitCode JobExitInfo `json:"derived_exit_code"`
+}
+
+type JobSubmitParam struct {
+       Account           string      `json:"account"`
+       Partition         string      `json:"partition,omitempty"`
+       QoS               string      `json:"qos,omitempty"`
+       Name              string      `json:"name,omitempty"`
+       Tasks             int64       `json:"tasks,omitempty"`
+       CurrentWorkingDir string      
`json:"current_working_directory,omitempty"`
+       Environment       []string    `json:"environment,omitempty"`
+       CpusPerTask       int64       `json:"cpus_per_task,omitempty"`
+       Memory            int64       `json:"memory,omitempty"`
+       TimeLimit         slurmNumber `json:"time_limit,omitempty"` // seconds
+}
+
+type JobSubmitRequest struct {
+       JobSubmitParam JobSubmitParam `json:"job"`
+       Script         string         `json:"script"`
+}
+
+type JobSubmitResponse struct {
+       JobID            int64  `json:"job_id"`
+       StepID           string `json:"step_id"`
+       JobSubmitUserMsg string `json:"job_submit_user_msg"`
+}
diff --git a/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor.go 
b/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor.go
new file mode 100644
index 000000000..fdb79d049
--- /dev/null
+++ b/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor.go
@@ -0,0 +1,25 @@
+package smonitor
+
+import (
+       
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
+       "github.com/apache/airavata-custos/pkg/events"
+       "github.com/apache/airavata-custos/pkg/service"
+)
+
+type SlurmMonitor struct {
+       slurmClient *client.Client
+       eventBus    *events.Bus
+       coreService service.CoreService
+}
+
+func NewSlurmMonitor(slurmClient *client.Client, eventBus *events.Bus, 
coreService service.CoreService) *SlurmMonitor {
+       return &SlurmMonitor{
+               slurmClient: slurmClient,
+               eventBus:    eventBus,
+               coreService: coreService,
+       }
+}
+
+func (m *SlurmMonitor) StartMonitor() {
+
+}
diff --git a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go 
b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
similarity index 79%
copy from connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
copy to connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
index 9eee18aef..f53933e8c 100644
--- a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
+++ b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
@@ -1,17 +1,15 @@
-package smapper
+package monitor
 
 import (
        "context"
-       "log/slog"
-       "os"
-       "sync"
-
-       "github.com/jmoiron/sqlx"
-
        
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
-       
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/subscribers"
+       
"github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/internal/smonitor"
        "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/service"
+       "github.com/jmoiron/sqlx"
+       "log/slog"
+       "os"
+       "sync"
 )
 
 func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, 
coreService *service.Service, _ *sync.WaitGroup) error {
@@ -22,12 +20,12 @@ func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus 
*events.Bus, coreServ
        token := os.Getenv("SLURM_TOKEN")
        apiVersion := os.Getenv("SLURM_API_VERSION")
        if apiUrl == "" || user == "" || token == "" || apiVersion == "" {
-               slog.Info("SLURM API credentials not fully provided, skipping 
SLURM Association Mapper connector")
+               slog.Info("SLURM API credentials not fully provided, skipping 
SLURM Usage Monitor connector")
                slog.Info("SLURM API credentials", "apiUrl", apiUrl, "user", 
user, "token", token, "apiVersion", apiVersion)
                return nil
        }
 
        slurmClient := client.New(apiUrl, user, token, apiVersion)
-       subscribers.NewAssociationSubscriber(slurmClient, eventBus, 
coreService).RegisterSubscribers()
+       go smonitor.NewSlurmMonitor(slurmClient, eventBus, 
coreService).StartMonitor()
        return nil
 }
diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go
index 99fb221c0..4bcd0510d 100644
--- a/internal/connectors/loader.go
+++ b/internal/connectors/loader.go
@@ -26,6 +26,7 @@ import (
 
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/pkg/amie"
        
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/pkg/smapper"
+       
"github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/pkg/monitor"
        "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -45,6 +46,12 @@ func LoadConnectors(ctx context.Context, database *sqlx.DB, 
eventBus *events.Bus
                return err
        }
 
+       slog.Info("loading SLURM Usage Monitor connector")
+       if err := monitor.LoadConnector(ctx, database, eventBus, coreService, 
wg); err != nil {
+               slog.Error("failed to load SLURM Usage Monitor connector", 
"error", err)
+               return err
+       }
+
        slog.Info("finished loading connectors")
        return nil
 }

Reply via email to