This is an automated email from the ASF dual-hosted git repository. DImuthuUpe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 3b0bf6845ed2b1f949eba306180e2c2ae1b6db06 Author: DImuthuUpe <[email protected]> AuthorDate: Tue May 19 14:36:11 2026 -0400 Moving member-specific quota to a seperate data model and implementing slurm subscriber --- .../internal/subscribers/members.go | 94 +++++++++- .../internal/subscribers/subscriber.go | 1 + docs/API-Docs.md | 94 +++++++--- ...location_membership_resource_overrides.down.sql | 21 +++ ...allocation_membership_resource_overrides.up.sql | 36 ++++ internal/server/server.go | 88 ++++++++-- ...llocation_membership_resource_override_store.go | 123 +++++++++++++ .../store/compute_allocation_membership_store.go | 11 +- internal/store/store.go | 20 +++ ...ation_membership_resource_override_subscribe.go | 67 ++++++++ pkg/events/types.go | 7 + pkg/models/allocation.go | 14 +- pkg/service/compute_allocation_membership.go | 27 --- ...pute_allocation_membership_resource_override.go | 191 +++++++++++++++++++++ pkg/service/service.go | 4 + 15 files changed, 721 insertions(+), 77 deletions(-) diff --git a/connectors/SLURM/Association-Mapper/internal/subscribers/members.go b/connectors/SLURM/Association-Mapper/internal/subscribers/members.go index cd609d5af..687ecd8d5 100644 --- a/connectors/SLURM/Association-Mapper/internal/subscribers/members.go +++ b/connectors/SLURM/Association-Mapper/internal/subscribers/members.go @@ -4,12 +4,15 @@ import ( "log/slog" "context" + "time" + client "github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/operations" "github.com/apache/airavata-custos/pkg/models" - "time" ) -func (a *AssociationSubscriber) SubscribeToComputeAllocationMembershipCreation(membership models.ComputeAllocationMembership) { +func (a *AssociationSubscriber) SubscribeToComputeAllocationMembershipCreation( + membership models.ComputeAllocationMembership) { + slog.Info("Received compute allocation membership creation event", "membership", membership) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -48,6 +51,7 @@ func (a *AssociationSubscriber) SubscribeToComputeAllocationMembershipCreation(m if len(resources) == 0 { slog.Warn("No resources found for allocation, defaulting to partition 'default'", "allocation_id", allocation.ID) } + association := client.Association{ Account: allocation.Name, Cluster: cluster.Name, @@ -62,3 +66,89 @@ func (a *AssociationSubscriber) SubscribeToComputeAllocationMembershipCreation(m slog.Info("Successfully upserted association", "association", association) } } + +func (a *AssociationSubscriber) SubscribeToComputeAllocationMembershipResourceOverrideCreation( + override models.ComputeAllocationMembershipResourceOverride) { + + slog.Info("Received compute allocation membership resource override creation event", "override", override) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // TODO: read per-resource override via + membership, err := a.coreService.GetComputeAllocationMembership(ctx, override.ComputeAllocationMembershipID) + if err != nil { + slog.Error("Failed to get compute allocation membership for resource override creation", "error", err) + return + } + + allocationResource, err := a.coreService.GetComputeAllocationResource(ctx, override.ComputeAllocationResourceID) + if err != nil { + slog.Error("Failed to get compute allocation resource for resource override creation", "error", err) + return + } + + slog.Info("Received compute allocation membership resource override creation event", + "membership", membership, "resource", allocationResource, "override", override) + + allocation, err := a.coreService.GetComputeAllocation(ctx, membership.ComputeAllocationID) + if err != nil { + slog.Error("Failed to get compute allocation", "error", err) + return + } + + cluster, err := a.coreService.GetComputeCluster(ctx, allocation.ComputeClusterID) + if err != nil { + slog.Error("Failed to get compute cluster", "error", err) + return + } + + user, err := a.coreService.GetUser(ctx, membership.UserID) + if err != nil { + slog.Error("Failed to get user", "error", err) + return + } + + csu, err := a.coreService.GetComputeClusterUserByPair(ctx, cluster.ID, user.ID) // TODO: use this to get the local username for the association instead of assuming it's the same as the Airavata Custos username + if err != nil { + slog.Error("Failed to get compute cluster user by pair", "error", err) + return + } + + grpTres := []client.TRES{} + + if allocationResource.ResourceType == "GrpTRES" { + grpTres = append(grpTres, client.TRES{ + Type: allocationResource.Name, + Count: override.OverriddenResourceAmount, // override.OverriddenResourceAmount is the SU amount, but SLURM needs the actual resource amount (e.g., number of CPU hours), so we need to convert it using the rate for the resource + }) + } + + grpTresMins := []client.TRES{} + + if allocationResource.ResourceType == "GrpTRESMins" { + grpTresMins = append(grpTresMins, client.TRES{ + Type: allocationResource.Name, + Count: override.OverriddenResourceAmount, + }) + } + + limits := client.AssocLimits{ + GrpTRES: grpTres, + GrpTRESMins: grpTresMins, + } + + association := client.Association{ + Account: allocation.Name, + Cluster: cluster.Name, + User: csu.LocalUsername, + Partition: allocationResource.Name, + Limits: limits, + } + + err = a.slurmClient.UpsertAssociation(association) + if err != nil { + slog.Error("Failed to upsert association for membership resource override creation", "error", err) + } else { + slog.Info("Successfully upserted association for membership resource override creation", "association", association) + } +} diff --git a/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go b/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go index 658dcc073..17b65cce8 100644 --- a/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go +++ b/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go @@ -23,4 +23,5 @@ func (a *AssociationSubscriber) RegisterSubscribers() { a.eventBus.SubscribeComputeAllocationDeleted(a.SubscribeToComputeAllocationDeletion) a.eventBus.SubscribeComputeAllocationUpdated(a.SubscribeToComputeAllocationUpdate) a.eventBus.SubscribeComputeAllocationMembershipCreated(a.SubscribeToComputeAllocationMembershipCreation) + a.eventBus.SubscribeComputeAllocationMembershipResourceOverrideCreated(a.SubscribeToComputeAllocationMembershipResourceOverrideCreation) } diff --git a/docs/API-Docs.md b/docs/API-Docs.md index 81aa716db..b571c8255 100644 --- a/docs/API-Docs.md +++ b/docs/API-Docs.md @@ -1125,7 +1125,6 @@ Create a new membership. { "compute_allocation_id": "alloc-123", "user_id": "user-456", - "allocation_amount": 50000, "start_time": "2026-01-01T00:00:00Z", "end_time": "2026-12-31T23:59:59Z", "membership_status": "ACTIVE" @@ -1136,6 +1135,8 @@ Create a new membership. existing rows. - `membership_status` defaults to `ACTIVE` when omitted. - `id` is generated server-side when omitted. +- Per-resource SU caps for this membership are stored separately as + `ComputeAllocationMembershipResourceOverride` rows; see below. **Errors** @@ -1151,21 +1152,6 @@ Retrieve a membership by ID. Replace mutable fields of a membership. Fields left blank/zero in the request body fall back to the stored value (partial updates). -### PUT `/compute-allocation-memberships/{id}/allocation-amount` - -Update only the SU sub-allocation granted to the user. - -**Request body** - -```json -{ "allocation_amount": 75000 } -``` - -**Errors** - -- `400` — negative `allocation_amount`. -- `404` — no membership with the given ID. - ### PUT `/compute-allocation-memberships/{id}/status` Update only the lifecycle status of the membership (`ACTIVE`, `INACTIVE`, @@ -1196,6 +1182,62 @@ List every membership recorded against the given allocation, ordered by List every allocation membership held by the given user, ordered by `start_time` ascending. +### GET `/compute-allocation-memberships/{id}/resource-overrides` + +List every per-resource override recorded against the given membership. + +--- + +## Compute Allocation Membership Resource Overrides + +A `ComputeAllocationMembershipResourceOverride` records the SU amount of a +specific resource (`ComputeAllocationResource`) that has been granted to a +specific membership. There can be at most one override per +`(compute_allocation_membership_id, compute_allocation_resource_id)` pair +(enforced by a unique key). Overrides are cascade-deleted when either the +parent membership or the parent resource is removed. + +### POST `/compute-allocation-membership-resource-overrides` + +Create a new override. + +**Request body** + +```json +{ + "compute_allocation_membership_id": "membership-123", + "compute_allocation_resource_id": "resource-456", + "overridden_resource_amount": 10000 +} +``` + +- All three fields are required. +- `overridden_resource_amount` must be non-negative. +- `id` is generated server-side when omitted. + +**Errors** + +- `400` — missing required fields, referenced membership/resource not found, + or negative `overridden_resource_amount`. +- `409` — an override already exists for this `(membership, resource)` pair. + +### GET `/compute-allocation-membership-resource-overrides/{id}` + +Retrieve an override by ID. + +### PUT `/compute-allocation-membership-resource-overrides/{id}` + +Replace mutable fields of an override. Fields left blank/zero in the request +body fall back to the stored value (partial updates). + +### DELETE `/compute-allocation-membership-resource-overrides/{id}` + +Remove an override. + +### GET `/compute-allocation-resources/{id}/membership-overrides` + +List every membership override referencing the given resource. + --- ## End-to-end example @@ -1229,7 +1271,7 @@ ALLOC_ID=$(curl -s -X POST $BASE/compute-allocations \ RES_ID=$(curl -s -X POST $BASE/compute-allocation-resources \ -H 'Content-Type: application/json' \ - -d '{"name":"debug","resource_type":"CPU","resource_amount":24}' | jq -r .id) + -d '{"name":"debug","resource_type":"GrpTRES","resource_amount":24}' | jq -r .id) # Attach the resource to the allocation. curl -s -X POST $BASE/compute-allocations/$ALLOC_ID/resources \ @@ -1270,7 +1312,6 @@ MEMBERSHIP_ID=$(curl -s -X POST $BASE/compute-allocation-memberships \ -d "{ \"compute_allocation_id\":\"$ALLOC_ID\", \"user_id\":\"$CLUSTER_USER_ACCT_ID\", - \"allocation_amount\":50000, \"start_time\":\"2026-04-01T00:00:00Z\", \"end_time\":\"2026-06-30T23:59:59Z\", \"membership_status\":\"ACTIVE\" @@ -1282,11 +1323,22 @@ curl -s $BASE/compute-allocations/$ALLOC_ID/memberships | jq # List every allocation this user is a member of. curl -s $BASE/users/$CLUSTER_USER_ACCT_ID/compute-allocation-memberships | jq -# Bump the user's SU sub-allocation. -curl -s -X PUT $BASE/compute-allocation-memberships/$MEMBERSHIP_ID/allocation-amount \ +# Grant the membership a per-resource SU override on the debug resource. +OVERRIDE_ID=$(curl -s -X POST $BASE/compute-allocation-membership-resource-overrides \ + -H 'Content-Type: application/json' \ + -d "{ + \"compute_allocation_membership_id\":\"$MEMBERSHIP_ID\", + \"compute_allocation_resource_id\":\"$RES_ID\", + \"overridden_resource_amount\":10000 + }" | jq -r .id) + +# Bump the override amount. +curl -s -X PUT $BASE/compute-allocation-membership-resource-overrides/$OVERRIDE_ID \ -H 'Content-Type: application/json' \ - -d '{"allocation_amount":75000}' | jq + -d '{"overridden_resource_amount":15000}' | jq +# List every resource override for the membership. +curl -s $BASE/compute-allocation-memberships/$MEMBERSHIP_ID/resource-overrides | jq # Record a usage diff against the allocation. diff --git a/internal/db/migrations/000011_compute_allocation_membership_resource_overrides.down.sql b/internal/db/migrations/000011_compute_allocation_membership_resource_overrides.down.sql new file mode 100644 index 000000000..e07d3db11 --- /dev/null +++ b/internal/db/migrations/000011_compute_allocation_membership_resource_overrides.down.sql @@ -0,0 +1,21 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +DROP TABLE IF EXISTS compute_allocation_membership_resource_overrides; + +ALTER TABLE compute_allocation_memberships + ADD COLUMN allocation_amount BIGINT NOT NULL DEFAULT 0 AFTER user_id; diff --git a/internal/db/migrations/000011_compute_allocation_membership_resource_overrides.up.sql b/internal/db/migrations/000011_compute_allocation_membership_resource_overrides.up.sql new file mode 100644 index 000000000..946bbb029 --- /dev/null +++ b/internal/db/migrations/000011_compute_allocation_membership_resource_overrides.up.sql @@ -0,0 +1,36 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +ALTER TABLE compute_allocation_memberships + DROP COLUMN allocation_amount; + +CREATE TABLE IF NOT EXISTS compute_allocation_membership_resource_overrides +( + id VARCHAR(255) NOT NULL, + compute_allocation_membership_id VARCHAR(255) NOT NULL, + compute_allocation_resource_id VARCHAR(255) NOT NULL, + overridden_resource_amount BIGINT NOT NULL DEFAULT 0, + created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6), + PRIMARY KEY (id), + UNIQUE KEY uq_camro_membership_resource (compute_allocation_membership_id, compute_allocation_resource_id), + KEY idx_camro_resource (compute_allocation_resource_id), + CONSTRAINT fk_camro_membership FOREIGN KEY (compute_allocation_membership_id) + REFERENCES compute_allocation_memberships (id) ON DELETE CASCADE, + CONSTRAINT fk_camro_resource FOREIGN KEY (compute_allocation_resource_id) + REFERENCES compute_allocation_resources (id) ON DELETE CASCADE +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci; diff --git a/internal/server/server.go b/internal/server/server.go index 4b3e20fd2..960799fd8 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -111,11 +111,17 @@ func (s *Server) routes() { s.mux.HandleFunc("POST /compute-allocation-memberships", s.createComputeAllocationMembership) s.mux.HandleFunc("GET /compute-allocation-memberships/{id}", s.getComputeAllocationMembership) s.mux.HandleFunc("PUT /compute-allocation-memberships/{id}", s.updateComputeAllocationMembership) - s.mux.HandleFunc("PUT /compute-allocation-memberships/{id}/allocation-amount", s.updateMembershipAllocationAmount) s.mux.HandleFunc("PUT /compute-allocation-memberships/{id}/status", s.updateMembershipStatus) s.mux.HandleFunc("DELETE /compute-allocation-memberships/{id}", s.deleteComputeAllocationMembership) s.mux.HandleFunc("GET /compute-allocations/{id}/memberships", s.listMembersForAllocation) s.mux.HandleFunc("GET /users/{id}/compute-allocation-memberships", s.listAllocationsForUser) + s.mux.HandleFunc("GET /compute-allocation-memberships/{id}/resource-overrides", s.listOverridesForMembership) + + s.mux.HandleFunc("POST /compute-allocation-membership-resource-overrides", s.createComputeAllocationMembershipResourceOverride) + s.mux.HandleFunc("GET /compute-allocation-membership-resource-overrides/{id}", s.getComputeAllocationMembershipResourceOverride) + s.mux.HandleFunc("PUT /compute-allocation-membership-resource-overrides/{id}", s.updateComputeAllocationMembershipResourceOverride) + s.mux.HandleFunc("DELETE /compute-allocation-membership-resource-overrides/{id}", s.deleteComputeAllocationMembershipResourceOverride) + s.mux.HandleFunc("GET /compute-allocation-resources/{id}/membership-overrides", s.listOverridesForResource) s.mux.HandleFunc("POST /compute-allocation-usages", s.createComputeAllocationUsage) s.mux.HandleFunc("GET /compute-allocation-usages/{id}", s.getComputeAllocationUsage) @@ -652,22 +658,6 @@ func (s *Server) updateComputeAllocationMembership(w http.ResponseWriter, r *htt writeJSON(w, http.StatusOK, updated) } -func (s *Server) updateMembershipAllocationAmount(w http.ResponseWriter, r *http.Request) { - var body struct { - AllocationAmount int64 `json:"allocation_amount"` - } - if err := decodeJSON(r, &body); err != nil { - writeError(w, http.StatusBadRequest, err) - return - } - updated, err := s.svc.UpdateMembershipAllocationAmount(r.Context(), r.PathValue("id"), body.AllocationAmount) - if err != nil { - writeServiceError(w, err) - return - } - writeJSON(w, http.StatusOK, updated) -} - func (s *Server) updateMembershipStatus(w http.ResponseWriter, r *http.Request) { var body struct { MembershipStatus models.AllocationStatus `json:"membership_status"` @@ -759,6 +749,70 @@ func (s *Server) listUsagesByUser(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, rows) } +func (s *Server) createComputeAllocationMembershipResourceOverride(w http.ResponseWriter, r *http.Request) { + var o models.ComputeAllocationMembershipResourceOverride + if err := decodeJSON(r, &o); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + created, err := s.svc.CreateComputeAllocationMembershipResourceOverride(r.Context(), &o) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusCreated, created) +} + +func (s *Server) getComputeAllocationMembershipResourceOverride(w http.ResponseWriter, r *http.Request) { + o, err := s.svc.GetComputeAllocationMembershipResourceOverride(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, o) +} + +func (s *Server) updateComputeAllocationMembershipResourceOverride(w http.ResponseWriter, r *http.Request) { + var o models.ComputeAllocationMembershipResourceOverride + if err := decodeJSON(r, &o); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + o.ID = r.PathValue("id") + updated, err := s.svc.UpdateComputeAllocationMembershipResourceOverride(r.Context(), &o) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, updated) +} + +func (s *Server) deleteComputeAllocationMembershipResourceOverride(w http.ResponseWriter, r *http.Request) { + if err := s.svc.DeleteComputeAllocationMembershipResourceOverride(r.Context(), r.PathValue("id")); err != nil { + writeServiceError(w, err) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (s *Server) listOverridesForMembership(w http.ResponseWriter, r *http.Request) { + rows, err := s.svc.ListOverridesForMembership(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, rows) +} + +func (s *Server) listOverridesForResource(w http.ResponseWriter, r *http.Request) { + rows, err := s.svc.ListOverridesForResource(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, rows) +} + func (s *Server) getTotalSUUsageForAllocation(w http.ResponseWriter, r *http.Request) { total, err := s.svc.GetTotalSUUsageForAllocation(r.Context(), r.PathValue("id")) if err != nil { diff --git a/internal/store/compute_allocation_membership_resource_override_store.go b/internal/store/compute_allocation_membership_resource_override_store.go new file mode 100644 index 000000000..1e0f0d1b0 --- /dev/null +++ b/internal/store/compute_allocation_membership_resource_override_store.go @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package store + +import ( + "context" + "database/sql" + "errors" + + "github.com/jmoiron/sqlx" + + "github.com/apache/airavata-custos/pkg/models" +) + +const computeAllocationMembershipResourceOverrideColumns = "id, compute_allocation_membership_id, compute_allocation_resource_id, overridden_resource_amount" + +type mysqlComputeAllocationMembershipResourceOverrideStore struct { + db *sqlx.DB +} + +// NewComputeAllocationMembershipResourceOverrideStore returns a MySQL-backed +// ComputeAllocationMembershipResourceOverrideStore. +func NewComputeAllocationMembershipResourceOverrideStore(db *sqlx.DB) ComputeAllocationMembershipResourceOverrideStore { + return &mysqlComputeAllocationMembershipResourceOverrideStore{db: db} +} + +func (s *mysqlComputeAllocationMembershipResourceOverrideStore) FindByID(ctx context.Context, id string) (*models.ComputeAllocationMembershipResourceOverride, error) { + var o models.ComputeAllocationMembershipResourceOverride + err := s.db.GetContext(ctx, &o, + `SELECT `+computeAllocationMembershipResourceOverrideColumns+` + FROM compute_allocation_membership_resource_overrides + WHERE id = ?`, id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &o, nil +} + +func (s *mysqlComputeAllocationMembershipResourceOverrideStore) FindByPair(ctx context.Context, membershipID, resourceID string) (*models.ComputeAllocationMembershipResourceOverride, error) { + var o models.ComputeAllocationMembershipResourceOverride + err := s.db.GetContext(ctx, &o, + `SELECT `+computeAllocationMembershipResourceOverrideColumns+` + FROM compute_allocation_membership_resource_overrides + WHERE compute_allocation_membership_id = ? AND compute_allocation_resource_id = ?`, + membershipID, resourceID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &o, nil +} + +func (s *mysqlComputeAllocationMembershipResourceOverrideStore) FindByMembership(ctx context.Context, membershipID string) ([]models.ComputeAllocationMembershipResourceOverride, error) { + var rows []models.ComputeAllocationMembershipResourceOverride + err := s.db.SelectContext(ctx, &rows, + `SELECT `+computeAllocationMembershipResourceOverrideColumns+` + FROM compute_allocation_membership_resource_overrides + WHERE compute_allocation_membership_id = ? + ORDER BY compute_allocation_resource_id`, membershipID) + if err != nil { + return nil, err + } + return rows, nil +} + +func (s *mysqlComputeAllocationMembershipResourceOverrideStore) FindByResource(ctx context.Context, resourceID string) ([]models.ComputeAllocationMembershipResourceOverride, error) { + var rows []models.ComputeAllocationMembershipResourceOverride + err := s.db.SelectContext(ctx, &rows, + `SELECT `+computeAllocationMembershipResourceOverrideColumns+` + FROM compute_allocation_membership_resource_overrides + WHERE compute_allocation_resource_id = ? + ORDER BY compute_allocation_membership_id`, resourceID) + if err != nil { + return nil, err + } + return rows, nil +} + +func (s *mysqlComputeAllocationMembershipResourceOverrideStore) Create(ctx context.Context, tx *sql.Tx, o *models.ComputeAllocationMembershipResourceOverride) error { + _, err := tx.ExecContext(ctx, + `INSERT INTO compute_allocation_membership_resource_overrides + (id, compute_allocation_membership_id, compute_allocation_resource_id, overridden_resource_amount) + VALUES (?, ?, ?, ?)`, + o.ID, o.ComputeAllocationMembershipID, o.ComputeAllocationResourceID, o.OverriddenResourceAmount) + return err +} + +func (s *mysqlComputeAllocationMembershipResourceOverrideStore) Update(ctx context.Context, tx *sql.Tx, o *models.ComputeAllocationMembershipResourceOverride) error { + _, err := tx.ExecContext(ctx, + `UPDATE compute_allocation_membership_resource_overrides + SET compute_allocation_membership_id = ?, + compute_allocation_resource_id = ?, + overridden_resource_amount = ? + WHERE id = ?`, + o.ComputeAllocationMembershipID, o.ComputeAllocationResourceID, o.OverriddenResourceAmount, o.ID) + return err +} + +func (s *mysqlComputeAllocationMembershipResourceOverrideStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { + _, err := tx.ExecContext(ctx, + `DELETE FROM compute_allocation_membership_resource_overrides WHERE id = ?`, id) + return err +} diff --git a/internal/store/compute_allocation_membership_store.go b/internal/store/compute_allocation_membership_store.go index aae350ac9..baf217782 100644 --- a/internal/store/compute_allocation_membership_store.go +++ b/internal/store/compute_allocation_membership_store.go @@ -27,7 +27,7 @@ import ( "github.com/apache/airavata-custos/pkg/models" ) -const computeAllocationMembershipColumns = "id, compute_allocation_id, user_id, allocation_amount, start_time, end_time, membership_status" +const computeAllocationMembershipColumns = "id, compute_allocation_id, user_id, start_time, end_time, membership_status" type mysqlComputeAllocationMembershipStore struct { db *sqlx.DB @@ -96,9 +96,9 @@ func (s *mysqlComputeAllocationMembershipStore) FindByUser(ctx context.Context, func (s *mysqlComputeAllocationMembershipStore) Create(ctx context.Context, tx *sql.Tx, m *models.ComputeAllocationMembership) error { _, err := tx.ExecContext(ctx, `INSERT INTO compute_allocation_memberships - (id, compute_allocation_id, user_id, allocation_amount, start_time, end_time, membership_status) - VALUES (?, ?, ?, ?, ?, ?, ?)`, - m.ID, m.ComputeAllocationID, m.UserID, m.AllocationAmount, m.StartTime, m.EndTime, string(m.MembershipStatus)) + (id, compute_allocation_id, user_id, start_time, end_time, membership_status) + VALUES (?, ?, ?, ?, ?, ?)`, + m.ID, m.ComputeAllocationID, m.UserID, m.StartTime, m.EndTime, string(m.MembershipStatus)) return err } @@ -107,12 +107,11 @@ func (s *mysqlComputeAllocationMembershipStore) Update(ctx context.Context, tx * `UPDATE compute_allocation_memberships SET compute_allocation_id = ?, user_id = ?, - allocation_amount = ?, start_time = ?, end_time = ?, membership_status = ? WHERE id = ?`, - m.ComputeAllocationID, m.UserID, m.AllocationAmount, m.StartTime, m.EndTime, string(m.MembershipStatus), m.ID) + m.ComputeAllocationID, m.UserID, m.StartTime, m.EndTime, string(m.MembershipStatus), m.ID) return err } diff --git a/internal/store/store.go b/internal/store/store.go index 4dddec180..8066d87e8 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -250,6 +250,26 @@ type ComputeAllocationMembershipStore interface { Delete(ctx context.Context, tx *sql.Tx, id string) error } +// ComputeAllocationMembershipResourceOverrideStore defines persistence +// operations for per-resource overrides of the SU amount granted to a +// compute allocation membership. +type ComputeAllocationMembershipResourceOverrideStore interface { + // FindByID returns the override with the given ID, or nil if it does not exist. + FindByID(ctx context.Context, id string) (*models.ComputeAllocationMembershipResourceOverride, error) + // FindByPair returns the override for a (membership, resource) pair, or nil if absent. + FindByPair(ctx context.Context, membershipID, resourceID string) (*models.ComputeAllocationMembershipResourceOverride, error) + // FindByMembership returns every override recorded against the given membership. + FindByMembership(ctx context.Context, membershipID string) ([]models.ComputeAllocationMembershipResourceOverride, error) + // FindByResource returns every override referencing the given resource. + FindByResource(ctx context.Context, resourceID string) ([]models.ComputeAllocationMembershipResourceOverride, error) + // Create inserts a new override within the provided transaction. + Create(ctx context.Context, tx *sql.Tx, o *models.ComputeAllocationMembershipResourceOverride) error + // Update replaces mutable fields of an existing override within the provided transaction. + Update(ctx context.Context, tx *sql.Tx, o *models.ComputeAllocationMembershipResourceOverride) error + // Delete removes an override by ID within the provided transaction. + Delete(ctx context.Context, tx *sql.Tx, id string) error +} + // ComputeAllocationUsageStore defines persistence operations for the // append-only log of resource consumption events charged against a compute // allocation. diff --git a/pkg/events/compute_allocation_membership_resource_override_subscribe.go b/pkg/events/compute_allocation_membership_resource_override_subscribe.go new file mode 100644 index 000000000..ea77026fa --- /dev/null +++ b/pkg/events/compute_allocation_membership_resource_override_subscribe.go @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// ComputeAllocationMembershipResourceOverrideHandler handles lifecycle +// events for membership resource overrides with a typed payload. +type ComputeAllocationMembershipResourceOverrideHandler func(o models.ComputeAllocationMembershipResourceOverride) + +// SubscribeComputeAllocationMembershipResourceOverrideCreated registers a +// typed handler invoked whenever a +// compute_allocation_membership_resource_override::create event is published. +func (b *Bus) SubscribeComputeAllocationMembershipResourceOverrideCreated(handler ComputeAllocationMembershipResourceOverrideHandler) { + b.subscribeMembershipResourceOverride(ComputeAllocationMembershipResourceOverrideCreateEvent, handler) +} + +// SubscribeComputeAllocationMembershipResourceOverrideUpdated registers a +// typed handler invoked whenever a +// compute_allocation_membership_resource_override::update event is published. +func (b *Bus) SubscribeComputeAllocationMembershipResourceOverrideUpdated(handler ComputeAllocationMembershipResourceOverrideHandler) { + b.subscribeMembershipResourceOverride(ComputeAllocationMembershipResourceOverrideUpdateEvent, handler) +} + +// SubscribeComputeAllocationMembershipResourceOverrideDeleted registers a +// typed handler invoked whenever a +// compute_allocation_membership_resource_override::delete event is published. +func (b *Bus) SubscribeComputeAllocationMembershipResourceOverrideDeleted(handler ComputeAllocationMembershipResourceOverrideHandler) { + b.subscribeMembershipResourceOverride(ComputeAllocationMembershipResourceOverrideDeleteEvent, handler) +} + +func (b *Bus) subscribeMembershipResourceOverride(topic EventType, handler ComputeAllocationMembershipResourceOverrideHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch o := value.(type) { + case models.ComputeAllocationMembershipResourceOverride: + handler(o) + case *models.ComputeAllocationMembershipResourceOverride: + if o != nil { + handler(*o) + } + default: + slog.Warn("compute allocation membership resource override event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/types.go b/pkg/events/types.go index 44758f0e4..8ae8de802 100644 --- a/pkg/events/types.go +++ b/pkg/events/types.go @@ -96,6 +96,13 @@ const ( ComputeAllocationMembershipDeleteEvent EventType = "compute_allocation_membership::delete" ) +// ComputeAllocationMembershipResourceOverride lifecycle message types. +const ( + ComputeAllocationMembershipResourceOverrideCreateEvent EventType = "compute_allocation_membership_resource_override::create" + ComputeAllocationMembershipResourceOverrideUpdateEvent EventType = "compute_allocation_membership_resource_override::update" + ComputeAllocationMembershipResourceOverrideDeleteEvent EventType = "compute_allocation_membership_resource_override::delete" +) + // ComputeAllocationResourceMapping lifecycle message types. const ( ComputeAllocationResourceMappingCreateEvent EventType = "compute_allocation_resource_mapping::create" diff --git a/pkg/models/allocation.go b/pkg/models/allocation.go index 7813dcc2a..ca95595b5 100644 --- a/pkg/models/allocation.go +++ b/pkg/models/allocation.go @@ -35,9 +35,9 @@ type ComputeAllocation struct { type ComputeAllocationResource struct { ID string `json:"id" db:"id"` - Name string `json:"name" db:"name"` // A human-readable name for the resource, e.g., "GPU B200", "CPU", "GPU RTX6000", etc. - ResourceType string `json:"resource_type" db:"resource_type"` // CPU, GPU, etc. - ResourceAmount int64 `json:"resource_amount" db:"resource_amount"` // Number of CPUs, GPUs, etc. allocated. + Name string `json:"name" db:"name"` // resource / partition name, e.g., "cpu", "gpu", etc. + ResourceType string `json:"resource_type" db:"resource_type"` // GrpTRES, GrpTRESMins + ResourceAmount int64 `json:"resource_amount" db:"resource_amount"` // Number of CPUs, GPUs, time in minutes, or other unit depending on the resource type. } type ComputeAllocationResourceMapping struct { @@ -95,11 +95,17 @@ type ComputeAllocationUsage struct { // Represents the usage of a compute alloca ComputeAllocationResourceID string `json:"compute_allocation_resource_id" db:"compute_allocation_resource_id"` // The specific resource consumed, e.g., 20 CPU hours, 10 GPU hours, etc. } +type ComputeAllocationMembershipResourceOverride struct { + ID string `json:"id" db:"id"` + ComputeAllocationMembershipID string `json:"compute_allocation_membership_id" db:"compute_allocation_membership_id"` + ComputeAllocationResourceID string `json:"compute_allocation_resource_id" db:"compute_allocation_resource_id"` + OverriddenResourceAmount int64 `json:"overridden_resource_amount" db:"overridden_resource_amount"` // The overridden SU amount for the user for this specific resource, e.g., 150 SUs for GPU hours, etc. +} + type ComputeAllocationMembership struct { ID string `json:"id" db:"id"` ComputeAllocationID string `json:"compute_allocation_id" db:"compute_allocation_id"` UserID string `json:"user_id" db:"user_id"` - AllocationAmount int64 `json:"allocation_amount" db:"allocation_amount"` // SUs allocated to the user, e.g., 100 CPU hours, 50 GPU hours, etc. StartTime time.Time `json:"start_time" db:"start_time"` EndTime time.Time `json:"end_time" db:"end_time"` MembershipStatus AllocationStatus `json:"membership_status" db:"membership_status"` // ACTIVE, INACTIVE, etc. diff --git a/pkg/service/compute_allocation_membership.go b/pkg/service/compute_allocation_membership.go index 9d6e0056d..f61bbdc06 100644 --- a/pkg/service/compute_allocation_membership.go +++ b/pkg/service/compute_allocation_membership.go @@ -154,33 +154,6 @@ func (s *Service) UpdateComputeAllocationMembership(ctx context.Context, m *mode return m, nil } -// UpdateMembershipAllocationAmount sets the SU sub-allocation granted to the -// user identified by the given membership ID. Other fields are preserved. -func (s *Service) UpdateMembershipAllocationAmount(ctx context.Context, id string, amount int64) (*models.ComputeAllocationMembership, error) { - if id == "" { - return nil, fmt.Errorf("%w: compute allocation membership id is required", ErrInvalidInput) - } - if amount < 0 { - return nil, fmt.Errorf("%w: allocation_amount must be non-negative", ErrInvalidInput) - } - existing, err := s.memberships.FindByID(ctx, id) - if err != nil { - return nil, fmt.Errorf("lookup compute allocation membership: %w", err) - } - if existing == nil { - return nil, ErrNotFound - } - existing.AllocationAmount = amount - if err := s.inTx(ctx, func(tx *sql.Tx) error { - return s.memberships.Update(ctx, tx, existing) - }); err != nil { - return nil, fmt.Errorf("update compute allocation membership amount: %w", err) - } - - s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, existing) - return existing, nil -} - // UpdateMembershipStatus sets the lifecycle status (ACTIVE, INACTIVE, etc.) // of the membership identified by the given ID. Other fields are preserved. func (s *Service) UpdateMembershipStatus(ctx context.Context, id string, status models.AllocationStatus) (*models.ComputeAllocationMembership, error) { diff --git a/pkg/service/compute_allocation_membership_resource_override.go b/pkg/service/compute_allocation_membership_resource_override.go new file mode 100644 index 000000000..81fadee40 --- /dev/null +++ b/pkg/service/compute_allocation_membership_resource_override.go @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package service + +import ( + "context" + "database/sql" + "fmt" + + "github.com/apache/airavata-custos/pkg/events" + "github.com/apache/airavata-custos/pkg/models" +) + +// CreateComputeAllocationMembershipResourceOverride records a per-resource +// override of the SU amount granted to the membership. The referenced +// membership and resource must exist, and the (membership, resource) pair +// must not already have an override. +func (s *Service) CreateComputeAllocationMembershipResourceOverride(ctx context.Context, o *models.ComputeAllocationMembershipResourceOverride) (*models.ComputeAllocationMembershipResourceOverride, error) { + if o == nil { + return nil, fmt.Errorf("%w: membership resource override is nil", ErrInvalidInput) + } + if o.ComputeAllocationMembershipID == "" { + return nil, fmt.Errorf("%w: compute_allocation_membership_id is required", ErrInvalidInput) + } + if o.ComputeAllocationResourceID == "" { + return nil, fmt.Errorf("%w: compute_allocation_resource_id is required", ErrInvalidInput) + } + if o.OverriddenResourceAmount < 0 { + return nil, fmt.Errorf("%w: overridden_resource_amount must be non-negative", ErrInvalidInput) + } + + if m, err := s.memberships.FindByID(ctx, o.ComputeAllocationMembershipID); err != nil { + return nil, fmt.Errorf("lookup compute allocation membership: %w", err) + } else if m == nil { + return nil, fmt.Errorf("%w: compute allocation membership %q not found", + ErrInvalidInput, o.ComputeAllocationMembershipID) + } + if r, err := s.resources.FindByID(ctx, o.ComputeAllocationResourceID); err != nil { + return nil, fmt.Errorf("lookup compute allocation resource: %w", err) + } else if r == nil { + return nil, fmt.Errorf("%w: compute allocation resource %q not found", + ErrInvalidInput, o.ComputeAllocationResourceID) + } + + if existing, err := s.membershipOverrides.FindByPair(ctx, o.ComputeAllocationMembershipID, o.ComputeAllocationResourceID); err != nil { + return nil, fmt.Errorf("lookup membership resource override: %w", err) + } else if existing != nil { + return nil, fmt.Errorf("%w: override already exists for membership %q resource %q", + ErrAlreadyExists, o.ComputeAllocationMembershipID, o.ComputeAllocationResourceID) + } + + if o.ID == "" { + o.ID = newID() + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.membershipOverrides.Create(ctx, tx, o) + }); err != nil { + return nil, fmt.Errorf("create membership resource override: %w", err) + } + + s.eventBus.Publish(events.ComputeAllocationMembershipResourceOverrideCreateEvent, o) + return o, nil +} + +// GetComputeAllocationMembershipResourceOverride retrieves an override by ID. +func (s *Service) GetComputeAllocationMembershipResourceOverride(ctx context.Context, id string) (*models.ComputeAllocationMembershipResourceOverride, error) { + o, err := s.membershipOverrides.FindByID(ctx, id) + if err != nil { + return nil, fmt.Errorf("get membership resource override: %w", err) + } + if o == nil { + return nil, ErrNotFound + } + return o, nil +} + +// GetComputeAllocationMembershipResourceOverrideByPair retrieves the override +// for the given (membership, resource) pair. +func (s *Service) GetComputeAllocationMembershipResourceOverrideByPair(ctx context.Context, membershipID, resourceID string) (*models.ComputeAllocationMembershipResourceOverride, error) { + if membershipID == "" { + return nil, fmt.Errorf("%w: compute_allocation_membership_id is required", ErrInvalidInput) + } + if resourceID == "" { + return nil, fmt.Errorf("%w: compute_allocation_resource_id is required", ErrInvalidInput) + } + o, err := s.membershipOverrides.FindByPair(ctx, membershipID, resourceID) + if err != nil { + return nil, fmt.Errorf("get membership resource override by pair: %w", err) + } + if o == nil { + return nil, ErrNotFound + } + return o, nil +} + +// ListOverridesForMembership returns every resource override recorded +// against the given membership. +func (s *Service) ListOverridesForMembership(ctx context.Context, membershipID string) ([]models.ComputeAllocationMembershipResourceOverride, error) { + if membershipID == "" { + return nil, fmt.Errorf("%w: compute_allocation_membership_id is required", ErrInvalidInput) + } + rows, err := s.membershipOverrides.FindByMembership(ctx, membershipID) + if err != nil { + return nil, fmt.Errorf("list overrides for membership: %w", err) + } + return rows, nil +} + +// ListOverridesForResource returns every membership override referencing the +// given resource. +func (s *Service) ListOverridesForResource(ctx context.Context, resourceID string) ([]models.ComputeAllocationMembershipResourceOverride, error) { + if resourceID == "" { + return nil, fmt.Errorf("%w: compute_allocation_resource_id is required", ErrInvalidInput) + } + rows, err := s.membershipOverrides.FindByResource(ctx, resourceID) + if err != nil { + return nil, fmt.Errorf("list overrides for resource: %w", err) + } + return rows, nil +} + +// UpdateComputeAllocationMembershipResourceOverride replaces mutable fields +// of an existing override. +func (s *Service) UpdateComputeAllocationMembershipResourceOverride(ctx context.Context, o *models.ComputeAllocationMembershipResourceOverride) (*models.ComputeAllocationMembershipResourceOverride, error) { + if o == nil || o.ID == "" { + return nil, fmt.Errorf("%w: membership resource override id is required", ErrInvalidInput) + } + if o.OverriddenResourceAmount < 0 { + return nil, fmt.Errorf("%w: overridden_resource_amount must be non-negative", ErrInvalidInput) + } + existing, err := s.membershipOverrides.FindByID(ctx, o.ID) + if err != nil { + return nil, fmt.Errorf("lookup membership resource override: %w", err) + } + if existing == nil { + return nil, ErrNotFound + } + if o.ComputeAllocationMembershipID == "" { + o.ComputeAllocationMembershipID = existing.ComputeAllocationMembershipID + } + if o.ComputeAllocationResourceID == "" { + o.ComputeAllocationResourceID = existing.ComputeAllocationResourceID + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.membershipOverrides.Update(ctx, tx, o) + }); err != nil { + return nil, fmt.Errorf("update membership resource override: %w", err) + } + + s.eventBus.Publish(events.ComputeAllocationMembershipResourceOverrideUpdateEvent, o) + return o, nil +} + +// DeleteComputeAllocationMembershipResourceOverride removes an override by ID. +func (s *Service) DeleteComputeAllocationMembershipResourceOverride(ctx context.Context, id string) error { + if id == "" { + return fmt.Errorf("%w: membership resource override id is required", ErrInvalidInput) + } + existing, err := s.membershipOverrides.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup membership resource override: %w", err) + } + if existing == nil { + return ErrNotFound + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.membershipOverrides.Delete(ctx, tx, id) + }); err != nil { + return fmt.Errorf("delete membership resource override: %w", err) + } + + s.eventBus.Publish(events.ComputeAllocationMembershipResourceOverrideDeleteEvent, existing) + return nil +} diff --git a/pkg/service/service.go b/pkg/service/service.go index e187ff82f..23ca9bc83 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -49,6 +49,7 @@ type Service struct { changeRequests store.ComputeAllocationChangeRequestStore changeEvents store.ComputeAllocationChangeRequestEventStore memberships store.ComputeAllocationMembershipStore + membershipOverrides store.ComputeAllocationMembershipResourceOverrideStore usages store.ComputeAllocationUsageStore } @@ -71,6 +72,7 @@ func New(database *sqlx.DB, eventBus *events.Bus) *Service { changeRequests: store.NewComputeAllocationChangeRequestStore(database), changeEvents: store.NewComputeAllocationChangeRequestEventStore(database), memberships: store.NewComputeAllocationMembershipStore(database), + membershipOverrides: store.NewComputeAllocationMembershipResourceOverrideStore(database), usages: store.NewComputeAllocationUsageStore(database), } } @@ -93,6 +95,7 @@ func NewWithStores( allocDiffs store.ComputeAllocationDiffStore, changeRequests store.ComputeAllocationChangeRequestStore, changeEvents store.ComputeAllocationChangeRequestEventStore, + membershipOverrides store.ComputeAllocationMembershipResourceOverrideStore, memberships store.ComputeAllocationMembershipStore, usages store.ComputeAllocationUsageStore, ) *Service { @@ -111,6 +114,7 @@ func NewWithStores( allocDiffs: allocDiffs, changeRequests: changeRequests, changeEvents: changeEvents, + membershipOverrides: membershipOverrides, memberships: memberships, usages: usages, }
