This is an automated email from the ASF dual-hosted git repository. DImuthuUpe pushed a commit to branch association-mapper in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit e3d03fc554f3a6fc7309c213c345faa7259b8fd9 Author: DImuthuUpe <[email protected]> AuthorDate: Wed May 13 10:07:23 2026 -0400 Association mapper client impl --- connectors/SLURM/Association-Mapper/.gitignore | 4 + connectors/SLURM/Association-Mapper/Makefile | 18 +++ connectors/SLURM/Association-Mapper/README.md | 57 ++++++- connectors/SLURM/Association-Mapper/go.mod | 3 + .../internal/operations/accounts.go | 53 ++++++ .../internal/operations/accounts_test.go | 99 ++++++++++++ .../internal/operations/associations.go | 62 +++++++ .../internal/operations/associations_test.go | 74 +++++++++ .../internal/operations/client.go | 71 ++++++++ .../internal/operations/client_test.go | 47 ++++++ .../Association-Mapper/internal/operations/tres.go | 39 +++++ .../internal/operations/tres_test.go | 33 ++++ .../internal/operations/types.go | 180 +++++++++++++++++++++ connectors/SLURM/Association-Mapper/main.go | 25 +++ 14 files changed, 763 insertions(+), 2 deletions(-) diff --git a/connectors/SLURM/Association-Mapper/.gitignore b/connectors/SLURM/Association-Mapper/.gitignore new file mode 100644 index 000000000..7be4b455e --- /dev/null +++ b/connectors/SLURM/Association-Mapper/.gitignore @@ -0,0 +1,4 @@ +/bin/ +*.test +*.out +.env diff --git a/connectors/SLURM/Association-Mapper/Makefile b/connectors/SLURM/Association-Mapper/Makefile new file mode 100644 index 000000000..32efab927 --- /dev/null +++ b/connectors/SLURM/Association-Mapper/Makefile @@ -0,0 +1,18 @@ +.PHONY: build test run tidy clean + +BIN := bin/association-mapper + +build: + go build -o $(BIN) . + +run: build + ./$(BIN) + +test: + go test ./... + +tidy: + go mod tidy + +clean: + rm -rf bin diff --git a/connectors/SLURM/Association-Mapper/README.md b/connectors/SLURM/Association-Mapper/README.md index 906d5dfba..667af800f 100644 --- a/connectors/SLURM/Association-Mapper/README.md +++ b/connectors/SLURM/Association-Mapper/README.md @@ -1,3 +1,56 @@ -SLURM Association Creation Logic Goes into this Plugin +# SLURM Association-Mapper -This plugin is triggered when the allocation manager has processed an allocation request and relased it to downstream handlers \ No newline at end of file +SLURM association creation logic lives in this plugin. It is triggered when the allocation manager has processed an allocation request and released it to downstream handlers. It talks to `slurmrestd` to manage accounts, associations, and TRES limits. + +## Prerequisites + +- Go **1.24+** +- A reachable `slurmrestd` endpoint (for integration runs) plus a SLURM user name and JWT token + +## Layout + +``` +. +├── main.go # entry point +├── internal/operations/ # slurmrestd client + accounts/associations/TRES +├── go.mod +└── Makefile +``` + +Module path: `github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper`. + +## Build + +```bash +# from this directory +make build # produces bin/association-mapper +# or directly: +go build -o bin/association-mapper . +``` + +## Run + +```bash +make run # build, then ./bin/association-mapper +``` + +The service starts, logs `association-mapper started`, and blocks until it receives `SIGINT` or `SIGTERM`. + +## Test + +```bash +make test # go test ./... +go vet ./... # static checks +``` + +Tests are hermetic and use `httptest` — no live `slurmrestd` required. + +## Common make targets + +| Target | Description | +|---------|--------------------------------------| +| `build` | Compile the binary into `bin/` | +| `run` | Build and run | +| `test` | Run all unit tests | +| `tidy` | `go mod tidy` | +| `clean` | Remove `bin/` | diff --git a/connectors/SLURM/Association-Mapper/go.mod b/connectors/SLURM/Association-Mapper/go.mod new file mode 100644 index 000000000..0e1832e7c --- /dev/null +++ b/connectors/SLURM/Association-Mapper/go.mod @@ -0,0 +1,3 @@ +module github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper + +go 1.24.0 diff --git a/connectors/SLURM/Association-Mapper/internal/operations/accounts.go b/connectors/SLURM/Association-Mapper/internal/operations/accounts.go new file mode 100644 index 000000000..ea7d87708 --- /dev/null +++ b/connectors/SLURM/Association-Mapper/internal/operations/accounts.go @@ -0,0 +1,53 @@ +// cli/internal/client/accounts.go +package operations + +import "fmt" + +type accountsResponse struct { + Accounts []Account `json:"accounts"` +} + +func (c *Client) ListAccounts() ([]Account, error) { + var out accountsResponse + if _, err := c.do("GET", "/slurmdb/v0.0.41/accounts", nil, &out); err != nil { + return nil, err + } + return out.Accounts, nil +} + +func (c *Client) GetAccount(name string) (*Account, error) { + var out accountsResponse + if _, err := c.do("GET", "/slurmdb/v0.0.41/account/"+name, nil, &out); err != nil { + return nil, err + } + if len(out.Accounts) == 0 { + return nil, fmt.Errorf("account %q not found", name) + } + return &out.Accounts[0], nil +} + +// CreateAccount creates an account and a cluster-scope association in a single +// call, mirroring `sacctmgr add account <name> cluster=<cluster>`. Without the +// association, the account record exists but is unusable (subsequent attempts +// to add a user silently no-op). slurmrestd exposes this via the +// /accounts_association/ endpoint, whose body wraps the account metadata and +// an association_condition naming the accounts + clusters to wire up. +func (c *Client) CreateAccount(a Account, cluster string) error { + body := map[string]any{ + "association_condition": map[string]any{ + "accounts": []string{a.Name}, + "clusters": []string{cluster}, + }, + "account": map[string]any{ + "description": a.Description, + "organization": a.Organization, + }, + } + _, err := c.do("POST", "/slurmdb/v0.0.41/accounts_association/", body, nil) + return err +} + +func (c *Client) DeleteAccount(name string) error { + _, err := c.do("DELETE", "/slurmdb/v0.0.41/account/"+name, nil, nil) + return err +} diff --git a/connectors/SLURM/Association-Mapper/internal/operations/accounts_test.go b/connectors/SLURM/Association-Mapper/internal/operations/accounts_test.go new file mode 100644 index 000000000..a427116e5 --- /dev/null +++ b/connectors/SLURM/Association-Mapper/internal/operations/accounts_test.go @@ -0,0 +1,99 @@ +// cli/internal/client/accounts_test.go +package operations + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" +) + +func TestListAccounts(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/slurmdb/v0.0.41/accounts" || r.Method != "GET" { + t.Fatalf("unexpected %s %s", r.Method, r.URL.Path) + } + _, _ = w.Write([]byte(`{"accounts":[{"name":"root","description":"root account","organization":"artisan"}]}`)) + })) + defer srv.Close() + + c := New(srv.URL, "root", "t") + accts, err := c.ListAccounts() + if err != nil { + t.Fatal(err) + } + if len(accts) != 1 || accts[0].Name != "root" { + t.Errorf("accts = %+v", accts) + } +} + +func TestCreateAccount(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" || r.URL.Path != "/slurmdb/v0.0.41/accounts_association/" { + t.Fatalf("unexpected %s %s", r.Method, r.URL.Path) + } + body, _ := io.ReadAll(r.Body) + var payload struct { + AssociationCondition struct { + Accounts []string `json:"accounts"` + Clusters []string `json:"clusters"` + } `json:"association_condition"` + Account struct { + Description string `json:"description"` + Organization string `json:"organization"` + } `json:"account"` + } + _ = json.Unmarshal(body, &payload) + if len(payload.AssociationCondition.Accounts) != 1 || payload.AssociationCondition.Accounts[0] != "eng" { + t.Errorf("accounts = %+v", payload.AssociationCondition.Accounts) + } + if len(payload.AssociationCondition.Clusters) != 1 || payload.AssociationCondition.Clusters[0] != "artisan" { + t.Errorf("clusters = %+v", payload.AssociationCondition.Clusters) + } + if payload.Account.Description != "engineering" { + t.Errorf("description = %q", payload.Account.Description) + } + _, _ = w.Write([]byte(`{}`)) + })) + defer srv.Close() + + c := New(srv.URL, "root", "t") + if err := c.CreateAccount(Account{Name: "eng", Description: "engineering"}, "artisan"); err != nil { + t.Fatal(err) + } +} + +func TestDeleteAccount(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "DELETE" || r.URL.Path != "/slurmdb/v0.0.41/account/eng" { + t.Fatalf("unexpected %s %s", r.Method, r.URL.Path) + } + _, _ = w.Write([]byte(`{}`)) + })) + defer srv.Close() + + c := New(srv.URL, "root", "t") + if err := c.DeleteAccount("eng"); err != nil { + t.Fatal(err) + } +} + +func TestGetAccount(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/slurmdb/v0.0.41/account/eng" { + t.Fatalf("path = %s", r.URL.Path) + } + _, _ = w.Write([]byte(`{"accounts":[{"name":"eng","description":"engineering","organization":"artisan"}]}`)) + })) + defer srv.Close() + + c := New(srv.URL, "root", "t") + a, err := c.GetAccount("eng") + if err != nil { + t.Fatal(err) + } + if a.Name != "eng" || a.Description != "engineering" { + t.Errorf("a = %+v", a) + } +} diff --git a/connectors/SLURM/Association-Mapper/internal/operations/associations.go b/connectors/SLURM/Association-Mapper/internal/operations/associations.go new file mode 100644 index 000000000..ab34cd931 --- /dev/null +++ b/connectors/SLURM/Association-Mapper/internal/operations/associations.go @@ -0,0 +1,62 @@ +// cli/internal/client/associations.go +package operations + +import "net/url" + +type AssocFilter struct { + Account string + User string + Cluster string + Partition string +} + +func (f AssocFilter) query() string { + v := url.Values{} + if f.Account != "" { + v.Set("account", f.Account) + } + if f.User != "" { + v.Set("user", f.User) + } + if f.Cluster != "" { + v.Set("cluster", f.Cluster) + } + if f.Partition != "" { + v.Set("partition", f.Partition) + } + return v.Encode() +} + +type associationsResponse struct { + Associations []Association `json:"associations"` +} + +func (c *Client) ListAssociations(f AssocFilter) ([]Association, error) { + path := "/slurmdb/v0.0.41/associations" + if q := f.query(); q != "" { + path += "?" + q + } + var out associationsResponse + if _, err := c.do("GET", path, nil, &out); err != nil { + return nil, err + } + return out.Associations, nil +} + +// UpsertAssociation creates or updates an association. slurmrestd POST +// /slurmdb/v0.0.41/associations is an upsert: if the (cluster,account,user) +// triple exists, it's updated; otherwise created. +func (c *Client) UpsertAssociation(a Association) error { + body := map[string]any{"associations": []Association{a}} + _, err := c.do("POST", "/slurmdb/v0.0.41/associations", body, nil) + return err +} + +func (c *Client) DeleteAssociation(f AssocFilter) error { + path := "/slurmdb/v0.0.41/association" + if q := f.query(); q != "" { + path += "?" + q + } + _, err := c.do("DELETE", path, nil, nil) + return err +} diff --git a/connectors/SLURM/Association-Mapper/internal/operations/associations_test.go b/connectors/SLURM/Association-Mapper/internal/operations/associations_test.go new file mode 100644 index 000000000..9082927b0 --- /dev/null +++ b/connectors/SLURM/Association-Mapper/internal/operations/associations_test.go @@ -0,0 +1,74 @@ +// cli/internal/client/associations_test.go +package operations + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + + +func TestListAssociationsByAccount(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/slurmdb/v0.0.41/associations" { + t.Fatalf("path = %s", r.URL.Path) + } + if got, _ := url.QueryUnescape(r.URL.RawQuery); got != "account=eng" { + t.Fatalf("query = %q", got) + } + _, _ = w.Write([]byte(`{"associations":[{"account":"eng","cluster":"artisan","user":"alice","id_association":5}]}`)) + })) + defer srv.Close() + c := New(srv.URL, "root", "t") + assocs, err := c.ListAssociations(AssocFilter{Account: "eng"}) + if err != nil { + t.Fatal(err) + } + if len(assocs) != 1 || assocs[0].User != "alice" || assocs[0].ID != 5 { + t.Errorf("assocs = %+v", assocs) + } +} + +func TestCreateAssociation(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" || r.URL.Path != "/slurmdb/v0.0.41/associations" { + t.Fatalf("unexpected %s %s", r.Method, r.URL.Path) + } + b, _ := io.ReadAll(r.Body) + var payload struct { + Associations []Association `json:"associations"` + } + _ = json.Unmarshal(b, &payload) + if len(payload.Associations) != 1 || payload.Associations[0].Account != "eng" || + payload.Associations[0].User != "alice" { + t.Errorf("payload = %+v", payload) + } + _, _ = w.Write([]byte(`{}`)) + })) + defer srv.Close() + c := New(srv.URL, "root", "t") + err := c.UpsertAssociation(Association{Account: "eng", Cluster: "artisan", User: "alice"}) + if err != nil { + t.Fatal(err) + } +} + +func TestDeleteAssociation(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "DELETE" || r.URL.Path != "/slurmdb/v0.0.41/association" { + t.Fatalf("unexpected %s %s", r.Method, r.URL.Path) + } + if r.URL.Query().Get("account") != "eng" || r.URL.Query().Get("user") != "alice" { + t.Fatalf("query = %v", r.URL.RawQuery) + } + _, _ = w.Write([]byte(`{}`)) + })) + defer srv.Close() + c := New(srv.URL, "root", "t") + if err := c.DeleteAssociation(AssocFilter{Account: "eng", User: "alice"}); err != nil { + t.Fatal(err) + } +} diff --git a/connectors/SLURM/Association-Mapper/internal/operations/client.go b/connectors/SLURM/Association-Mapper/internal/operations/client.go new file mode 100644 index 000000000..62bd25b99 --- /dev/null +++ b/connectors/SLURM/Association-Mapper/internal/operations/client.go @@ -0,0 +1,71 @@ +// cli/internal/client/client.go +package operations + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +type Client struct { + baseURL string + user string + token string + http *http.Client +} + +func New(baseURL, user, token string) *Client { + return &Client{ + baseURL: strings.TrimRight(baseURL, "/"), + user: user, + token: token, + http: &http.Client{Timeout: 30 * time.Second}, + } +} + +func (c *Client) do(method, path string, body any, out any) (*http.Response, error) { + var reqBody io.Reader + if body != nil { + buf, err := json.Marshal(body) + if err != nil { + return nil, err + } + reqBody = bytes.NewReader(buf) + } + req, err := http.NewRequest(method, c.baseURL+path, reqBody) + if err != nil { + return nil, err + } + req.Header.Set("X-SLURM-USER-NAME", c.user) + req.Header.Set("X-SLURM-USER-TOKEN", c.token) + req.Header.Set("Accept", "application/json") + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + resp, err := c.http.Do(req) + if err != nil { + return nil, err + } + if resp.StatusCode >= 400 { + defer resp.Body.Close() + buf, _ := io.ReadAll(resp.Body) + var er ErrorResponse + if json.Unmarshal(buf, &er) == nil && len(er.Errors) > 0 { + e := er.Errors[0] + return resp, fmt.Errorf("slurmrestd %d: %s (code=%d source=%s)", + resp.StatusCode, e.Description, e.ErrorNumber, e.Source) + } + return resp, fmt.Errorf("slurmrestd %d: %s", resp.StatusCode, string(buf)) + } + if out != nil { + defer resp.Body.Close() + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { + return resp, fmt.Errorf("decode: %w", err) + } + } + return resp, nil +} diff --git a/connectors/SLURM/Association-Mapper/internal/operations/client_test.go b/connectors/SLURM/Association-Mapper/internal/operations/client_test.go new file mode 100644 index 000000000..3d07b6456 --- /dev/null +++ b/connectors/SLURM/Association-Mapper/internal/operations/client_test.go @@ -0,0 +1,47 @@ +// cli/internal/client/client_test.go +package operations + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestDoSetsHeaders(t *testing.T) { + var gotName, gotToken string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotName = r.Header.Get("X-SLURM-USER-NAME") + gotToken = r.Header.Get("X-SLURM-USER-TOKEN") + w.WriteHeader(200) + _, _ = w.Write([]byte(`{}`)) + })) + defer srv.Close() + + c := New(srv.URL, "root", "tok123") + if _, err := c.do("GET", "/slurm/v0.0.41/ping", nil, nil); err != nil { + t.Fatal(err) + } + if gotName != "root" { + t.Errorf("X-SLURM-USER-NAME = %q", gotName) + } + if gotToken != "tok123" { + t.Errorf("X-SLURM-USER-TOKEN = %q", gotToken) + } +} + +func TestDoUnwrapsErrors(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(400) + _, _ = w.Write([]byte(`{"errors":[{"description":"nope","error_number":42,"error":"Bad","source":"test"}]}`)) + })) + defer srv.Close() + c := New(srv.URL, "root", "tok") + _, err := c.do("GET", "/x", nil, nil) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "42") || !strings.Contains(err.Error(), "nope") { + t.Errorf("err = %v", err) + } +} diff --git a/connectors/SLURM/Association-Mapper/internal/operations/tres.go b/connectors/SLURM/Association-Mapper/internal/operations/tres.go new file mode 100644 index 000000000..313c58c59 --- /dev/null +++ b/connectors/SLURM/Association-Mapper/internal/operations/tres.go @@ -0,0 +1,39 @@ +// cli/internal/client/tres.go +package operations + +import ( + "fmt" + "strconv" + "strings" +) + +// ParseTRES parses a comma-separated TRES spec: +// +// "cpu=100", "cpu=100,mem=8000", "gres/gpu=2", "cpu=10,gres/gpu=4" +func ParseTRES(s string) ([]TRES, error) { + if strings.TrimSpace(s) == "" { + return nil, nil + } + parts := strings.Split(s, ",") + out := make([]TRES, 0, len(parts)) + for _, p := range parts { + kv := strings.SplitN(p, "=", 2) + if len(kv) != 2 { + return nil, fmt.Errorf("malformed TRES entry %q (want key=value)", p) + } + n, err := strconv.ParseInt(strings.TrimSpace(kv[1]), 10, 64) + if err != nil { + return nil, fmt.Errorf("TRES count %q not an integer", kv[1]) + } + t := TRES{Count: n} + key := strings.TrimSpace(kv[0]) + if slash := strings.Index(key, "/"); slash >= 0 { + t.Type = key[:slash] + t.Name = key[slash+1:] + } else { + t.Type = key + } + out = append(out, t) + } + return out, nil +} diff --git a/connectors/SLURM/Association-Mapper/internal/operations/tres_test.go b/connectors/SLURM/Association-Mapper/internal/operations/tres_test.go new file mode 100644 index 000000000..79ba50faf --- /dev/null +++ b/connectors/SLURM/Association-Mapper/internal/operations/tres_test.go @@ -0,0 +1,33 @@ +// cli/internal/client/tres_test.go +package operations + +import ( + "reflect" + "testing" +) + +func TestParseTRES(t *testing.T) { + cases := []struct { + in string + want []TRES + }{ + {"cpu=100", []TRES{{Type: "cpu", Count: 100}}}, + {"cpu=100,mem=8000", []TRES{{Type: "cpu", Count: 100}, {Type: "mem", Count: 8000}}}, + {"gres/gpu=4", []TRES{{Type: "gres", Name: "gpu", Count: 4}}}, + {"cpu=10,gres/gpu=2", []TRES{{Type: "cpu", Count: 10}, {Type: "gres", Name: "gpu", Count: 2}}}, + } + for _, c := range cases { + got, err := ParseTRES(c.in) + if err != nil { + t.Errorf("ParseTRES(%q): err=%v", c.in, err) + continue + } + if !reflect.DeepEqual(got, c.want) { + t.Errorf("ParseTRES(%q) = %+v, want %+v", c.in, got, c.want) + } + } + + if _, err := ParseTRES("nope"); err == nil { + t.Error("expected error on malformed input") + } +} diff --git a/connectors/SLURM/Association-Mapper/internal/operations/types.go b/connectors/SLURM/Association-Mapper/internal/operations/types.go new file mode 100644 index 000000000..75aaca6bd --- /dev/null +++ b/connectors/SLURM/Association-Mapper/internal/operations/types.go @@ -0,0 +1,180 @@ +// cli/internal/client/types.go +package operations + +import "encoding/json" + +type ErrorResponse struct { + Errors []struct { + Description string `json:"description"` + ErrorNumber int `json:"error_number"` + Error string `json:"error"` + Source string `json:"source"` + } `json:"errors"` + Warnings []struct { + Description string `json:"description"` + Source string `json:"source"` + } `json:"warnings"` +} + +type TRES struct { + Type string `json:"type"` + Name string `json:"name,omitempty"` + Count int64 `json:"count"` +} + +type Account struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Organization string `json:"organization,omitempty"` +} + +type AssocLimits struct { + GrpJobs *int64 `json:"grp_jobs,omitempty"` + GrpTRES []TRES `json:"grp_tres,omitempty"` + GrpTRESMins []TRES `json:"grp_tres_mins,omitempty"` + MaxWallPM *int64 `json:"max_wall_pj,omitempty"` +} + +type Association struct { + Account string `json:"account"` + Cluster string `json:"cluster"` + User string `json:"user"` + Partition string `json:"partition,omitempty"` + ParentAccount string `json:"parent_account,omitempty"` + IsDefault *bool `json:"is_default,omitempty"` + ID int64 `json:"id_association,omitempty"` + // Limits is a logical grouping — slurmrestd v0.0.41 actually encodes limits + // in a nested `max` object per-association. We translate between the two + // shapes in Marshal/UnmarshalJSON below. + Limits AssocLimits `json:"-"` +} + +// slurmNumber matches slurmrestd's {set, infinite, number} triple used for +// all scalar limit values in the v0.0.41 accounting schema. +type slurmNumber struct { + Set bool `json:"set"` + Infinite bool `json:"infinite"` + Number int64 `json:"number"` +} + +func numPtr(n *int64) *slurmNumber { + if n == nil { + return nil + } + return &slurmNumber{Set: true, Number: *n} +} + +func ptrNum(n *slurmNumber) *int64 { + if n == nil || !n.Set || n.Infinite { + return nil + } + v := n.Number + return &v +} + +// assocMax is the v0.0.41 "max" sub-object inside each association. We only +// populate the fields we actually manage; slurmrestd ignores unset sub-objects. +type assocMax struct { + Jobs *assocMaxJobs `json:"jobs,omitempty"` + TRES *assocMaxTRES `json:"tres,omitempty"` +} + +type assocMaxJobs struct { + Per *assocMaxJobsPer `json:"per,omitempty"` +} + +type assocMaxJobsPer struct { + Count *slurmNumber `json:"count,omitempty"` // GrpJobs + WallClock *slurmNumber `json:"wall_clock,omitempty"` // MaxWallDurationPerJob (seconds) +} + +type assocMaxTRES struct { + Total []TRES `json:"total,omitempty"` // GrpTRES + Group *assocMaxTRESGp `json:"group,omitempty"` +} + +type assocMaxTRESGp struct { + Minutes []TRES `json:"minutes,omitempty"` // GrpTRESMins +} + +// assocWire is the on-the-wire association record used for both request +// marshaling and response unmarshaling. `user` is emitted even when empty +// because slurmrestd rejects payloads without it (error 9200). +type assocWire struct { + Account string `json:"account"` + Cluster string `json:"cluster"` + User string `json:"user"` + Partition string `json:"partition,omitempty"` + ParentAccount string `json:"parent_account,omitempty"` + IsDefault *bool `json:"is_default,omitempty"` + ID int64 `json:"id_association,omitempty"` + Max *assocMax `json:"max,omitempty"` +} + +func (a Association) MarshalJSON() ([]byte, error) { + w := assocWire{ + Account: a.Account, + Cluster: a.Cluster, + User: a.User, + Partition: a.Partition, + ParentAccount: a.ParentAccount, + IsDefault: a.IsDefault, + ID: a.ID, + } + m := &assocMax{} + touched := false + if a.Limits.GrpJobs != nil || a.Limits.MaxWallPM != nil { + per := &assocMaxJobsPer{} + if a.Limits.GrpJobs != nil { + per.Count = numPtr(a.Limits.GrpJobs) + } + if a.Limits.MaxWallPM != nil { + per.WallClock = numPtr(a.Limits.MaxWallPM) + } + m.Jobs = &assocMaxJobs{Per: per} + touched = true + } + if len(a.Limits.GrpTRES) > 0 || len(a.Limits.GrpTRESMins) > 0 { + t := &assocMaxTRES{} + if len(a.Limits.GrpTRES) > 0 { + t.Total = a.Limits.GrpTRES + } + if len(a.Limits.GrpTRESMins) > 0 { + t.Group = &assocMaxTRESGp{Minutes: a.Limits.GrpTRESMins} + } + m.TRES = t + touched = true + } + if touched { + w.Max = m + } + return json.Marshal(w) +} + +func (a *Association) UnmarshalJSON(data []byte) error { + var w assocWire + if err := json.Unmarshal(data, &w); err != nil { + return err + } + a.Account = w.Account + a.Cluster = w.Cluster + a.User = w.User + a.Partition = w.Partition + a.ParentAccount = w.ParentAccount + a.IsDefault = w.IsDefault + a.ID = w.ID + a.Limits = AssocLimits{} + if w.Max != nil { + if w.Max.Jobs != nil && w.Max.Jobs.Per != nil { + a.Limits.GrpJobs = ptrNum(w.Max.Jobs.Per.Count) + a.Limits.MaxWallPM = ptrNum(w.Max.Jobs.Per.WallClock) + } + if w.Max.TRES != nil { + a.Limits.GrpTRES = w.Max.TRES.Total + if w.Max.TRES.Group != nil { + a.Limits.GrpTRESMins = w.Max.TRES.Group.Minutes + } + } + } + return nil +} diff --git a/connectors/SLURM/Association-Mapper/main.go b/connectors/SLURM/Association-Mapper/main.go new file mode 100644 index 000000000..d04dcad1e --- /dev/null +++ b/connectors/SLURM/Association-Mapper/main.go @@ -0,0 +1,25 @@ +// Package main is the entry point for the SLURM Association-Mapper connector. +// +// It consumes allocation events released by the allocation manager and +// materializes them as SLURM associations via slurmrestd. +package main + +import ( + "context" + "log/slog" + "os" + "os/signal" + "syscall" +) + +func main() { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) + slog.SetDefault(logger) + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + logger.Info("association-mapper started") + <-ctx.Done() + logger.Info("association-mapper stopped") +}
