This is an automated email from the ASF dual-hosted git repository.
DImuthuUpe pushed a commit to branch usage-monitor
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
The following commit(s) were added to refs/heads/usage-monitor by this push:
new 808bd69f0 Adding slurm monitor and fetching job list api
808bd69f0 is described below
commit 808bd69f0af45c4859f7952459da797adc4f21cd
Author: DImuthuUpe <[email protected]>
AuthorDate: Tue May 26 10:03:48 2026 -0400
Adding slurm monitor and fetching job list api
---
.../SLURM/Association-Mapper/pkg/smapper/loader.go | 2 +-
connectors/SLURM/Rest-Client/pkg/client/client.go | 3 +
connectors/SLURM/Rest-Client/pkg/client/jobs.go | 58 +++++++++++++++
.../pkg/client/jobs_integration_test.go | 82 ++++++++++++++++++++++
connectors/SLURM/Rest-Client/pkg/client/types.go | 58 +++++++++++++++
.../Usage-Monitor/internal/smonitor/smonitor.go | 25 +++++++
.../pkg/monitor}/loader.go | 18 +++--
internal/connectors/loader.go | 7 ++
8 files changed, 242 insertions(+), 11 deletions(-)
diff --git a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
index 9eee18aef..66aba8e6a 100644
--- a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
+++ b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
@@ -8,8 +8,8 @@ import (
"github.com/jmoiron/sqlx"
-
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/subscribers"
+
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/service"
)
diff --git a/connectors/SLURM/Rest-Client/pkg/client/client.go
b/connectors/SLURM/Rest-Client/pkg/client/client.go
index 6886db8ab..85eacf017 100644
--- a/connectors/SLURM/Rest-Client/pkg/client/client.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/client.go
@@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
+ "log"
"net/http"
"strings"
"time"
@@ -37,6 +38,8 @@ func (c *Client) do(method, path string, body any, out any)
(*http.Response, err
return nil, err
}
reqBody = bytes.NewReader(buf)
+
+ log.Printf("Request body: %s", string(buf))
}
req, err := http.NewRequest(method, c.baseURL+path, reqBody)
if err != nil {
diff --git a/connectors/SLURM/Rest-Client/pkg/client/jobs.go
b/connectors/SLURM/Rest-Client/pkg/client/jobs.go
new file mode 100644
index 000000000..270b026e3
--- /dev/null
+++ b/connectors/SLURM/Rest-Client/pkg/client/jobs.go
@@ -0,0 +1,58 @@
+package client
+
+/*
+curl -s -X GET \
+ "http://localhost:6820/slurmdb/v0.0.41/jobs" \
+ -H "X-SLURM-USER-NAME: root" \
+ -H "X-SLURM-USER-TOKEN: $SLURM_JWT" \
+ -H "Content-Type: application/json" \
+ -d '{
+ "users": ["root"],
+ "start_time": {
+ "set": true,
+ "infinite": false,
+ "number": 1746057600
+ }
+ }'
+*/
+
+import (
+ "fmt"
+)
+
+type jobsResponse struct {
+ Jobs []JobInfo `json:"jobs"`
+}
+
+type JobFilter struct {
+ Users []string `json:"users,omitempty"`
+ StartTime *slurmNumber `json:"start_time,omitempty"`
+ EndTime *slurmNumber `json:"end_time,omitempty"`
+}
+
+func (c *Client) ListJobs(filter JobFilter) ([]JobInfo, error) {
+ var out jobsResponse
+ if _, err := c.do("GET", "/slurmdb/v0.0."+c.apiVersion+"/jobs", filter,
&out); err != nil {
+ return nil, err
+ }
+ return out.Jobs, nil
+}
+
+func (c *Client) GetJob(id int64) (*JobInfo, error) {
+ var out jobsResponse
+ if _, err := c.do("GET", fmt.Sprintf("/slurmdb/v0.0.%s/job/%d",
c.apiVersion, id), nil, &out); err != nil {
+ return nil, err
+ }
+ if len(out.Jobs) == 0 {
+ return nil, fmt.Errorf("job %d not found", id)
+ }
+ return &out.Jobs[0], nil
+}
+
+func (c *Client) SubmitJob(request JobSubmitRequest) (*JobSubmitResponse,
error) {
+ var out JobSubmitResponse
+ if _, err := c.do("POST", fmt.Sprintf("/slurm/v0.0.%s/job/submit",
c.apiVersion), request, &out); err != nil {
+ return nil, err
+ }
+ return &out, nil
+}
diff --git a/connectors/SLURM/Rest-Client/pkg/client/jobs_integration_test.go
b/connectors/SLURM/Rest-Client/pkg/client/jobs_integration_test.go
new file mode 100644
index 000000000..8b68950ea
--- /dev/null
+++ b/connectors/SLURM/Rest-Client/pkg/client/jobs_integration_test.go
@@ -0,0 +1,82 @@
+package client
+
+import (
+ "log"
+ "os"
+ "testing"
+ "time"
+)
+
+func TestListJobs(t *testing.T) {
+ if !IsLocalSlurmConfigAvailable() {
+ t.Skip("Skipping integration test for listing jobs because
local SLURM config is not available")
+ }
+
+ apiUrl := "http://localhost:6820"
+ user := os.Getenv("TEST_SLURM_USER")
+ token := os.Getenv("TEST_SLURM_TOKEN")
+ apiVersion := os.Getenv("TEST_SLURM_API_VERSION")
+
+ client := New(apiUrl, user, token, apiVersion)
+
+ JobSubmitRequest := JobSubmitRequest{
+ JobSubmitParam: JobSubmitParam{
+ Name: "test-job",
+ Account: "root",
+ Partition: "compute",
+ Tasks: 1,
+ CpusPerTask: 1,
+ TimeLimit: slurmNumber{
+ Set: true,
+ Infinite: false,
+ Number: 20, // 10 seconds
+ },
+ CurrentWorkingDir: "/home/testuser",
+ Environment: []string{
+ "TEST_ENV_VAR=test_value",
+ },
+ // You can set other job parameters here if needed
+ },
+ Script: "#!/bin/bash\nsleep 1", // Simple script that sleeps
for 1 second
+ }
+
+ currentTime := time.Now().Unix()
+ totalJobsToSubmit := 12
+ for i := 0; i < totalJobsToSubmit; i++ {
+ resp, err := client.SubmitJob(JobSubmitRequest)
+ if err != nil {
+ t.Fatalf("Failed to submit job: %v", err)
+ }
+
+ if resp.JobID == 0 {
+ t.Fatalf("Invalid job ID returned from job submission:
%d", resp.JobID)
+ }
+
+ log.Printf("Submitted job with ID: %d", resp.JobID)
+ }
+
+ sleepDuration := 20 // seconds
+ log.Printf("Sleeping for %d seconds to allow job to start...",
sleepDuration)
+ time.Sleep(time.Duration(sleepDuration) * time.Second)
+
+ filter := JobFilter{
+ // You can set filter parameters here if needed
+ Users: []string{"root"},
+ StartTime: &slurmNumber{Set: true, Infinite: false, Number:
currentTime},
+ }
+
+ jobs, err := client.ListJobs(filter)
+ if err != nil {
+ t.Fatalf("Failed to list jobs: %v", err)
+ }
+ if len(jobs) == 0 {
+ t.Log("No jobs found")
+ } else {
+ t.Logf("Found %d jobs", len(jobs))
+ if len(jobs) != totalJobsToSubmit {
+ t.Logf("Expected at %d jobs, but found %d",
totalJobsToSubmit, len(jobs))
+ } else {
+ t.Log("Successfully found all submitted jobs")
+ }
+ }
+}
diff --git a/connectors/SLURM/Rest-Client/pkg/client/types.go
b/connectors/SLURM/Rest-Client/pkg/client/types.go
index 3863b77a7..9909c9887 100644
--- a/connectors/SLURM/Rest-Client/pkg/client/types.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/types.go
@@ -184,3 +184,61 @@ func (a *Association) UnmarshalJSON(data []byte) error {
}
return nil
}
+
+type JobTime struct {
+ Elapsed int64 `json:"elapsed"`
+ Eligible int64 `json:"eligible"`
+ End int64 `json:"end"`
+ Start int64 `json:"start"`
+ Submission int64 `json:"submission"`
+ Suspended int64 `json:"suspended"`
+}
+
+type JobTresInfo struct {
+ Allocated []TRES `json:"allocated,omitempty"`
+ Requested []TRES `json:"requested,omitempty"`
+}
+
+type JobExitInfo struct {
+ Status []string `json:"status"`
+ ReturnCode slurmNumber `json:"return_code"`
+}
+
+type JobInfo struct {
+ Account string `json:"account"`
+ Cluster string `json:"cluster"`
+ Time JobTime `json:"time"`
+ JobID int64 `json:"job_id"`
+ Name string `json:"name"`
+ Partition string `json:"partition"`
+ QoS string `json:"qos"`
+ User string `json:"user"`
+ Nodes string `json:"nodes"`
+ Tres JobTresInfo `json:"tres"`
+ ExitCode JobExitInfo `json:"exit_code"`
+ DerivedExitCode JobExitInfo `json:"derived_exit_code"`
+}
+
+type JobSubmitParam struct {
+ Account string `json:"account"`
+ Partition string `json:"partition,omitempty"`
+ QoS string `json:"qos,omitempty"`
+ Name string `json:"name,omitempty"`
+ Tasks int64 `json:"tasks,omitempty"`
+ CurrentWorkingDir string
`json:"current_working_directory,omitempty"`
+ Environment []string `json:"environment,omitempty"`
+ CpusPerTask int64 `json:"cpus_per_task,omitempty"`
+ Memory int64 `json:"memory,omitempty"`
+ TimeLimit slurmNumber `json:"time_limit,omitempty"` // seconds
+}
+
+type JobSubmitRequest struct {
+ JobSubmitParam JobSubmitParam `json:"job"`
+ Script string `json:"script"`
+}
+
+type JobSubmitResponse struct {
+ JobID int64 `json:"job_id"`
+ StepID string `json:"step_id"`
+ JobSubmitUserMsg string `json:"job_submit_user_msg"`
+}
diff --git a/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor.go
b/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor.go
new file mode 100644
index 000000000..fdb79d049
--- /dev/null
+++ b/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor.go
@@ -0,0 +1,25 @@
+package smonitor
+
+import (
+
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
+ "github.com/apache/airavata-custos/pkg/events"
+ "github.com/apache/airavata-custos/pkg/service"
+)
+
+type SlurmMonitor struct {
+ slurmClient *client.Client
+ eventBus *events.Bus
+ coreService service.CoreService
+}
+
+func NewSlurmMonitor(slurmClient *client.Client, eventBus *events.Bus,
coreService service.CoreService) *SlurmMonitor {
+ return &SlurmMonitor{
+ slurmClient: slurmClient,
+ eventBus: eventBus,
+ coreService: coreService,
+ }
+}
+
+func (m *SlurmMonitor) StartMonitor() {
+
+}
diff --git a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
similarity index 79%
copy from connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
copy to connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
index 9eee18aef..f53933e8c 100644
--- a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
+++ b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
@@ -1,17 +1,15 @@
-package smapper
+package monitor
import (
"context"
- "log/slog"
- "os"
- "sync"
-
- "github.com/jmoiron/sqlx"
-
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
-
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/subscribers"
+
"github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/internal/smonitor"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/service"
+ "github.com/jmoiron/sqlx"
+ "log/slog"
+ "os"
+ "sync"
)
func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus,
coreService *service.Service, _ *sync.WaitGroup) error {
@@ -22,12 +20,12 @@ func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus
*events.Bus, coreServ
token := os.Getenv("SLURM_TOKEN")
apiVersion := os.Getenv("SLURM_API_VERSION")
if apiUrl == "" || user == "" || token == "" || apiVersion == "" {
- slog.Info("SLURM API credentials not fully provided, skipping
SLURM Association Mapper connector")
+ slog.Info("SLURM API credentials not fully provided, skipping
SLURM Usage Monitor connector")
slog.Info("SLURM API credentials", "apiUrl", apiUrl, "user",
user, "token", token, "apiVersion", apiVersion)
return nil
}
slurmClient := client.New(apiUrl, user, token, apiVersion)
- subscribers.NewAssociationSubscriber(slurmClient, eventBus,
coreService).RegisterSubscribers()
+ go smonitor.NewSlurmMonitor(slurmClient, eventBus,
coreService).StartMonitor()
return nil
}
diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go
index 99fb221c0..4bcd0510d 100644
--- a/internal/connectors/loader.go
+++ b/internal/connectors/loader.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/pkg/amie"
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/pkg/smapper"
+
"github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/pkg/monitor"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/service"
)
@@ -45,6 +46,12 @@ func LoadConnectors(ctx context.Context, database *sqlx.DB,
eventBus *events.Bus
return err
}
+ slog.Info("loading SLURM Usage Monitor connector")
+ if err := monitor.LoadConnector(ctx, database, eventBus, coreService,
wg); err != nil {
+ slog.Error("failed to load SLURM Usage Monitor connector",
"error", err)
+ return err
+ }
+
slog.Info("finished loading connectors")
return nil
}